asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [5/6] asterixdb git commit: Fix upsert deadlock and upsert with filtered primary only
Date Thu, 25 May 2017 06:04:12 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/639fe8cb/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
index 6684e68..fed4588 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
@@ -48,19 +48,19 @@ import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentFactory;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexFileManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import org.apache.hyracks.storage.am.lsm.common.api.ITwoPCIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
-import org.apache.hyracks.storage.am.lsm.common.impls.BlockingIOOperationCallbackWrapper;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexOperationContext;
 import org.apache.hyracks.storage.am.lsm.common.impls.ExternalIndexHarness;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
@@ -70,7 +70,6 @@ import org.apache.hyracks.storage.common.IIndexCursor;
 import org.apache.hyracks.storage.common.IModificationOperationCallback;
 import org.apache.hyracks.storage.common.ISearchOperationCallback;
 import org.apache.hyracks.storage.common.ISearchPredicate;
-import org.apache.hyracks.storage.common.MultiComparator;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.file.IFileMapProvider;
 
@@ -121,7 +120,7 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd
 
     @Override
     public void create() throws HyracksDataException {
-        if (isActivated) {
+        if (isActive) {
             throw new HyracksDataException("Failed to create the index since it is activated.");
         }
         fileManager.deleteDirs();
@@ -131,8 +130,13 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd
     }
 
     @Override
+    protected ILSMDiskComponent loadComponent(LSMComponentFileReferences refs) throws HyracksDataException {
+        return null;
+    }
+
+    @Override
     public void activate() throws HyracksDataException {
-        if (isActivated) {
+        if (isActive) {
             throw new HyracksDataException("Failed to activate the index since it is already activated.");
         }
 
@@ -149,7 +153,7 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd
                 diskComponents.add(component);
                 secondDiskComponents.add(component);
             }
-            ((ExternalIndexHarness) lsmHarness).indexFirstTimeActivated();
+            ((ExternalIndexHarness) getLsmHarness()).indexFirstTimeActivated();
         } else {
             // This index has been opened before or is brand new with no
             // components. It should also maintain the version pointer
@@ -175,66 +179,51 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd
                 }
             }
         }
-        isActivated = true;
+        isActive = true;
     }
 
     @Override
     public void clear() throws HyracksDataException {
-        if (!isActivated) {
+        if (!isActive) {
             throw new HyracksDataException("Failed to clear the index since it is not activated.");
         }
-        ((ExternalIndexHarness) lsmHarness).indexClear();
-
-        for (ILSMComponent c : diskComponents) {
-            LSMBTreeWithBuddyDiskComponent component = (LSMBTreeWithBuddyDiskComponent) c;
-            component.getBTree().deactivate();
-            component.getBuddyBTree().deactivate();
-            component.getBloomFilter().deactivate();
-            component.getBTree().destroy();
-            component.getBloomFilter().destroy();
-            component.getBuddyBTree().destroy();
+        ((ExternalIndexHarness) getLsmHarness()).indexClear();
+        for (ILSMDiskComponent c : diskComponents) {
+            clearDiskComponent(c);
             // Remove from second list to avoid destroying twice
             secondDiskComponents.remove(c);
         }
-
-        for (ILSMComponent c : secondDiskComponents) {
-            LSMBTreeWithBuddyDiskComponent component = (LSMBTreeWithBuddyDiskComponent) c;
-            component.getBTree().deactivate();
-            component.getBloomFilter().deactivate();
-            component.getBuddyBTree().deactivate();
-            component.getBTree().destroy();
-            component.getBloomFilter().destroy();
-            component.getBuddyBTree().destroy();
+        for (ILSMDiskComponent c : secondDiskComponents) {
+            clearDiskComponent(c);
         }
-
         diskComponents.clear();
         secondDiskComponents.clear();
         version = 0;
     }
 
     @Override
-    public void deactivate() throws HyracksDataException {
-        deactivate(true);
+    protected void clearDiskComponent(ILSMDiskComponent c) throws HyracksDataException {
+        LSMBTreeWithBuddyDiskComponent component = (LSMBTreeWithBuddyDiskComponent) c;
+        component.getBTree().deactivate();
+        component.getBuddyBTree().deactivate();
+        component.getBloomFilter().deactivate();
+        component.getBTree().destroy();
+        component.getBloomFilter().destroy();
+        component.getBuddyBTree().destroy();
     }
 
     @Override
     public void destroy() throws HyracksDataException {
-        if (isActivated) {
+        if (isActive) {
             throw new HyracksDataException("Failed to destroy the index since it is activated.");
         }
-        for (ILSMComponent c : diskComponents) {
-            LSMBTreeWithBuddyDiskComponent component = (LSMBTreeWithBuddyDiskComponent) c;
-            component.getBTree().destroy();
-            component.getBuddyBTree().destroy();
-            component.getBloomFilter().destroy();
+        for (ILSMDiskComponent c : diskComponents) {
+            destroyDiskComponent(c);
             // Remove from second list to avoid destroying twice
             secondDiskComponents.remove(c);
         }
-        for (ILSMComponent c : secondDiskComponents) {
-            LSMBTreeWithBuddyDiskComponent component = (LSMBTreeWithBuddyDiskComponent) c;
-            component.getBTree().destroy();
-            component.getBuddyBTree().destroy();
-            component.getBloomFilter().destroy();
+        for (ILSMDiskComponent c : secondDiskComponents) {
+            destroyDiskComponent(c);
         }
         diskComponents.clear();
         secondDiskComponents.clear();
@@ -243,19 +232,18 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd
     }
 
     @Override
-    public ILSMIndexAccessor createAccessor(IModificationOperationCallback modificationCallback,
-            ISearchOperationCallback searchCallback) throws HyracksDataException {
-        return new LSMBTreeWithBuddyAccessor(lsmHarness, createOpContext(searchCallback, version));
-    }
-
-    @Override
-    public void validate() throws HyracksDataException {
-        throw new UnsupportedOperationException("Validation not implemented for LSM B-Trees with Buddy B-Tree.");
+    protected void destroyDiskComponent(ILSMDiskComponent c) throws HyracksDataException {
+        LSMBTreeWithBuddyDiskComponent component = (LSMBTreeWithBuddyDiskComponent) c;
+        component.getBTree().destroy();
+        component.getBuddyBTree().destroy();
+        component.getBloomFilter().destroy();
     }
 
     @Override
-    public long getMemoryAllocationSize() {
-        return 0;
+    public ILSMIndexAccessor createAccessor(IModificationOperationCallback modificationCallback,
+            ISearchOperationCallback searchCallback) throws HyracksDataException {
+        return new LSMTreeIndexAccessor(getLsmHarness(), createOpContext(searchCallback, version),
+                ctx -> new LSMBTreeWithBuddySearchCursor(ctx, buddyBTreeFields));
     }
 
     // The subsume merged components is overridden to account for:
@@ -288,16 +276,16 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd
 
     // For initial load
     @Override
-    public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint,
-            boolean checkIfEmptyIndex) throws HyracksDataException {
-        return new LSMTwoPCBTreeWithBuddyBulkLoader(fillLevel, verifyInput, 0, checkIfEmptyIndex, false);
+    public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint)
+            throws HyracksDataException {
+        return new LSMTwoPCBTreeWithBuddyBulkLoader(fillLevel, verifyInput, 0, false);
     }
 
     // For transaction bulk load <- could consolidate with the above method ->
     @Override
-    public IIndexBulkLoader createTransactionBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint,
-            boolean checkIfEmptyIndex) throws HyracksDataException {
-        return new LSMTwoPCBTreeWithBuddyBulkLoader(fillLevel, verifyInput, numElementsHint, checkIfEmptyIndex, true);
+    public IIndexBulkLoader createTransactionBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint)
+            throws HyracksDataException {
+        return new LSMTwoPCBTreeWithBuddyBulkLoader(fillLevel, verifyInput, numElementsHint, true);
     }
 
     @Override
@@ -345,7 +333,8 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd
         List<ILSMComponent> mergingComponents = ctx.getComponentHolder();
         ITreeIndexCursor cursor = new LSMBTreeWithBuddySortedCursor(bctx, buddyBTreeFields);
         LSMComponentFileReferences relMergeFileRefs = getMergeTargetFileName(mergingComponents);
-        ILSMIndexAccessor accessor = new LSMBTreeWithBuddyAccessor(lsmHarness, bctx);
+        ILSMIndexAccessor accessor = new LSMTreeIndexAccessor(getLsmHarness(), bctx,
+                opCtx -> new LSMBTreeWithBuddySearchCursor(opCtx, buddyBTreeFields));
 
         // Since we have two lists of components, to tell whether we need to
         // keep deleted tuples, we need to know
@@ -367,22 +356,21 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd
     // This method creates the appropriate opContext for the targeted version
     public ExternalBTreeWithBuddyOpContext createOpContext(ISearchOperationCallback searchCallback, int targetVersion) {
         return new ExternalBTreeWithBuddyOpContext(btreeCmpFactories, buddyBtreeCmpFactories, searchCallback,
-                targetVersion, lsmHarness, btreeInteriorFrameFactory, btreeLeafFrameFactory,
+                targetVersion, getLsmHarness(), btreeInteriorFrameFactory, btreeLeafFrameFactory,
                 buddyBtreeLeafFrameFactory);
     }
 
     @Override
     public ILSMDiskComponent merge(ILSMIOOperation operation) throws HyracksDataException {
         LSMBTreeWithBuddyMergeOperation mergeOp = (LSMBTreeWithBuddyMergeOperation) operation;
-        ITreeIndexCursor cursor = mergeOp.getCursor();
+        IIndexCursor cursor = mergeOp.getCursor();
         ISearchPredicate btreeSearchPred = new RangePredicate(null, null, true, true, null, null);
         ILSMIndexOperationContext opCtx = ((LSMBTreeWithBuddySortedCursor) cursor).getOpCtx();
         opCtx.getComponentHolder().addAll(mergeOp.getMergingComponents());
         search(opCtx, cursor, btreeSearchPred);
 
-        LSMBTreeWithBuddyDiskComponent mergedComponent =
-                createDiskComponent(componentFactory, mergeOp.getBTreeMergeTarget(), mergeOp.getBuddyBTreeMergeTarget(),
-                        mergeOp.getBloomFilterMergeTarget(), true);
+        LSMBTreeWithBuddyDiskComponent mergedComponent = createDiskComponent(componentFactory, mergeOp.getTarget(),
+                mergeOp.getBuddyBTreeTarget(), mergeOp.getBloomFilterTarget(), true);
 
         // In case we must keep the deleted-keys BuddyBTrees, then they must be
         // merged *before* merging the b-trees so that
@@ -516,40 +504,25 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd
 
     @Override
     public void deactivate(boolean flushOnExit) throws HyracksDataException {
-        if (!isActivated) {
+        if (!isActive) {
             throw new HyracksDataException("Failed to deactivate the index since it is already deactivated.");
         }
-
         if (flushOnExit) {
-            BlockingIOOperationCallbackWrapper cb = new BlockingIOOperationCallbackWrapper(ioOpCallback);
-            cb.afterFinalize(LSMOperationType.FLUSH, null);
+            ioOpCallback.afterFinalize(LSMOperationType.FLUSH, null);
         }
         // Even though, we deactivate the index, we don't exit components or
         // modify any of the lists to make sure they
         // are there if the index was opened again
-
-        for (ILSMComponent c : diskComponents) {
-            LSMBTreeWithBuddyDiskComponent component = (LSMBTreeWithBuddyDiskComponent) c;
-            BTree btree = component.getBTree();
-            BTree buddyBtree = component.getBuddyBTree();
-            BloomFilter bloomFilter = component.getBloomFilter();
-            btree.deactivateCloseHandle();
-            buddyBtree.deactivateCloseHandle();
-            bloomFilter.deactivate();
+        for (ILSMDiskComponent c : diskComponents) {
+            deactivateDiskComponent(c);
         }
-        for (ILSMComponent c : secondDiskComponents) {
+        for (ILSMDiskComponent c : secondDiskComponents) {
             // Only deactivate non shared components
             if (!diskComponents.contains(c)) {
-                LSMBTreeWithBuddyDiskComponent component = (LSMBTreeWithBuddyDiskComponent) c;
-                BTree btree = component.getBTree();
-                BTree buddyBtree = component.getBuddyBTree();
-                BloomFilter bloomFilter = component.getBloomFilter();
-                btree.deactivateCloseHandle();
-                buddyBtree.deactivateCloseHandle();
-                bloomFilter.deactivate();
+                deactivateDiskComponent(c);
             }
         }
-        isActivated = false;
+        isActive = false;
     }
 
     @Override
@@ -611,26 +584,6 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd
     // accessor will try to do it
     // we could throw the exception here but we don't. it will eventually be
     // thrown by the index itself
-    public class LSMBTreeWithBuddyAccessor extends LSMTreeIndexAccessor {
-        public LSMBTreeWithBuddyAccessor(ILSMHarness lsmHarness, ILSMIndexOperationContext ctx) {
-            super(lsmHarness, ctx);
-        }
-
-        @Override
-        public ITreeIndexCursor createSearchCursor(boolean exclusive) {
-            return new LSMBTreeWithBuddySearchCursor(ctx, buddyBTreeFields);
-        }
-
-        public MultiComparator getBTreeMultiComparator() {
-            ExternalBTreeWithBuddyOpContext concreteCtx = (ExternalBTreeWithBuddyOpContext) ctx;
-            return concreteCtx.getBTreeMultiComparator();
-        }
-
-        public MultiComparator getBodyBTreeMultiComparator() {
-            ExternalBTreeWithBuddyOpContext concreteCtx = (ExternalBTreeWithBuddyOpContext) ctx;
-            return concreteCtx.getBuddyBTreeMultiComparator();
-        }
-    }
 
     // The bulk loader used for both initial loading and transaction
     // modifications
@@ -645,15 +598,12 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd
         private final boolean isTransaction;
 
         public LSMTwoPCBTreeWithBuddyBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint,
-                boolean checkIfEmptyIndex, boolean isTransaction) throws HyracksDataException {
+                boolean isTransaction) throws HyracksDataException {
             this.isTransaction = isTransaction;
             // Create the appropriate target
             if (isTransaction) {
                 component = createTransactionTarget();
             } else {
-                if (checkIfEmptyIndex && !isEmptyIndex()) {
-                    throw HyracksDataException.create(ErrorCode.LOAD_NON_EMPTY_INDEX);
-                }
                 component = createBulkLoadTarget();
             }
 
@@ -730,7 +680,7 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd
                     buddyBtree.deactivate();
                     bloomFilter.deactivate();
                 } else {
-                    lsmHarness.addBulkLoadedComponent(component);
+                    getLsmHarness().addBulkLoadedComponent(component);
                 }
             }
         }
@@ -781,7 +731,8 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd
     @Override
     public ILSMIndexAccessor createAccessor(ISearchOperationCallback searchCallback, int targetIndexVersion)
             throws HyracksDataException {
-        return new LSMBTreeWithBuddyAccessor(lsmHarness, createOpContext(searchCallback, targetIndexVersion));
+        return new LSMTreeIndexAccessor(getLsmHarness(), createOpContext(searchCallback, targetIndexVersion),
+                ctx -> new LSMBTreeWithBuddySearchCursor(ctx, buddyBTreeFields));
     }
 
     // This function in an instance of this index is only used after a bulk load
@@ -825,7 +776,7 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd
                     componentFileRefrences.getDeleteIndexFileReference(),
                     componentFileRefrences.getBloomFilterFileReference(), false);
         }
-        ((ExternalIndexHarness) lsmHarness).addTransactionComponents(component);
+        ((ExternalIndexHarness) getLsmHarness()).addTransactionComponents(component);
     }
 
     @Override
@@ -859,7 +810,74 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd
     }
 
     @Override
-    public void allocateMemoryComponents() throws HyracksDataException {
+    protected void deactivateMemoryComponent(ILSMMemoryComponent c) throws HyracksDataException {
         //do nothing since external index never use memory components
     }
+
+    @Override
+    protected void deactivateDiskComponent(ILSMDiskComponent c) throws HyracksDataException {
+        LSMBTreeWithBuddyDiskComponent component = (LSMBTreeWithBuddyDiskComponent) c;
+        BTree btree = component.getBTree();
+        BTree buddyBtree = component.getBuddyBTree();
+        BloomFilter bloomFilter = component.getBloomFilter();
+        btree.deactivateCloseHandle();
+        buddyBtree.deactivateCloseHandle();
+        bloomFilter.deactivate();
+    }
+
+    @Override
+    protected void destroyMemoryComponent(ILSMMemoryComponent c) throws HyracksDataException {
+        //do nothing since external index never use memory components
+    }
+
+    @Override
+    protected void clearMemoryComponent(ILSMMemoryComponent c) throws HyracksDataException {
+        //do nothing since external index never use memory components
+    }
+
+    @Override
+    protected long getMemoryComponentSize(ILSMMemoryComponent c) {
+        return 0;
+    }
+
+    @Override
+    protected void validateMemoryComponent(ILSMMemoryComponent c) throws HyracksDataException {
+        throw new UnsupportedOperationException("Validation not implemented for LSM B-Trees with Buddy B-Tree.");
+    }
+
+    @Override
+    protected void validateDiskComponent(ILSMDiskComponent c) throws HyracksDataException {
+        throw new UnsupportedOperationException("Validation not implemented for LSM B-Trees with Buddy B-Tree.");
+    }
+
+    @Override
+    protected void allocateMemoryComponent(ILSMMemoryComponent c) throws HyracksDataException {
+        //do nothing since external index never use memory components
+    }
+
+    @Override
+    protected LSMComponentFileReferences getMergeFileReferences(ILSMDiskComponent firstComponent,
+            ILSMDiskComponent lastComponent) throws HyracksDataException {
+        return null;
+    }
+
+    @Override
+    protected AbstractLSMIndexOperationContext createOpContext(IModificationOperationCallback modificationCallback,
+            ISearchOperationCallback searchCallback) {
+        return null;
+    }
+
+    @Override
+    protected ILSMIOOperation createFlushOperation(AbstractLSMIndexOperationContext opCtx,
+            ILSMMemoryComponent flushingComponent, LSMComponentFileReferences componentFileRefs,
+            ILSMIOOperationCallback callback) throws HyracksDataException {
+        return null;
+    }
+
+    @Override
+    protected ILSMIOOperation createMergeOperation(AbstractLSMIndexOperationContext opCtx,
+            List<ILSMComponent> mergingComponents, LSMComponentFileReferences mergeFileRefs,
+            ILSMIOOperationCallback callback) throws HyracksDataException {
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/639fe8cb/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddyOpContext.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddyOpContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddyOpContext.java
index ce18b20..d5cd2e2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddyOpContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddyOpContext.java
@@ -18,42 +18,26 @@
  */
 package org.apache.hyracks.storage.am.lsm.btree.impls;
 
-import java.util.LinkedList;
-import java.util.List;
-
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
-import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexOperationContext;
 import org.apache.hyracks.storage.common.IModificationOperationCallback;
 import org.apache.hyracks.storage.common.ISearchOperationCallback;
-import org.apache.hyracks.storage.common.ISearchPredicate;
 import org.apache.hyracks.storage.common.MultiComparator;
 
 public class ExternalBTreeWithBuddyOpContext extends AbstractLSMIndexOperationContext {
-    private IndexOperation op;
     private MultiComparator bTreeCmp;
     private MultiComparator buddyBTreeCmp;
-    private final List<ILSMComponent> componentHolder;
-    private final List<ILSMDiskComponent> componentsToBeMerged;
-    private final List<ILSMDiskComponent> componentsToBeReplicated;
-    private final ISearchOperationCallback searchCallback;
     private final int targetIndexVersion;
-    private ISearchPredicate searchPredicate;
     private LSMBTreeWithBuddyCursorInitialState searchInitialState;
 
     public ExternalBTreeWithBuddyOpContext(IBinaryComparatorFactory[] btreeCmpFactories,
             IBinaryComparatorFactory[] buddyBtreeCmpFactories, ISearchOperationCallback searchCallback,
             int targetIndexVersion, ILSMHarness lsmHarness, ITreeIndexFrameFactory btreeInteriorFrameFactory,
             ITreeIndexFrameFactory btreeLeafFrameFactory, ITreeIndexFrameFactory buddyBtreeLeafFrameFactory) {
-        this.componentHolder = new LinkedList<>();
-        this.componentsToBeMerged = new LinkedList<>();
-        this.componentsToBeReplicated = new LinkedList<>();
-        this.searchCallback = searchCallback;
+        super(null, null, null, searchCallback, null);
         this.targetIndexVersion = targetIndexVersion;
         this.bTreeCmp = MultiComparator.create(btreeCmpFactories);
         this.buddyBTreeCmp = MultiComparator.create(buddyBtreeCmpFactories);
@@ -63,29 +47,10 @@ public class ExternalBTreeWithBuddyOpContext extends AbstractLSMIndexOperationCo
     }
 
     @Override
-    public void setOperation(IndexOperation newOp) {
-        reset();
-        this.op = newOp;
-    }
-
-    @Override
     public void setCurrentMutableComponentId(int currentMutableComponentId) {
         // Do nothing. this should never be called for disk only indexes
     }
 
-    @Override
-    public void reset() {
-        super.reset();
-        componentHolder.clear();
-        componentsToBeMerged.clear();
-        componentsToBeReplicated.clear();
-    }
-
-    @Override
-    public IndexOperation getOperation() {
-        return op;
-    }
-
     public MultiComparator getBTreeMultiComparator() {
         return bTreeCmp;
     }
@@ -94,46 +59,16 @@ public class ExternalBTreeWithBuddyOpContext extends AbstractLSMIndexOperationCo
         return buddyBTreeCmp;
     }
 
-    @Override
-    public List<ILSMComponent> getComponentHolder() {
-        return componentHolder;
-    }
-
-    @Override
-    public ISearchOperationCallback getSearchOperationCallback() {
-        return searchCallback;
-    }
-
     // This should never be needed for disk only indexes
     @Override
     public IModificationOperationCallback getModificationCallback() {
         return null;
     }
 
-    @Override
-    public List<ILSMDiskComponent> getComponentsToBeMerged() {
-        return componentsToBeMerged;
-    }
-
     public int getTargetIndexVersion() {
         return targetIndexVersion;
     }
 
-    @Override
-    public void setSearchPredicate(ISearchPredicate searchPredicate) {
-        this.searchPredicate = searchPredicate;
-    }
-
-    @Override
-    public ISearchPredicate getSearchPredicate() {
-        return searchPredicate;
-    }
-
-    @Override
-    public List<ILSMDiskComponent> getComponentsToBeReplicated() {
-        return componentsToBeReplicated;
-    }
-
     public LSMBTreeWithBuddyCursorInitialState getSearchInitialState() {
         return searchInitialState;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/639fe8cb/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 1c99d5a..29532d4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -36,40 +36,36 @@ import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterFactory;
 import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
 import org.apache.hyracks.storage.am.btree.impls.BTree;
 import org.apache.hyracks.storage.am.btree.impls.BTree.BTreeAccessor;
-import org.apache.hyracks.storage.am.btree.impls.BTree.BTreeBulkLoader;
 import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
 import org.apache.hyracks.storage.am.common.api.IIndexOperationContext;
 import org.apache.hyracks.storage.am.common.api.IPageManager;
 import org.apache.hyracks.storage.am.common.api.ITreeIndex;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
-import org.apache.hyracks.storage.am.common.impls.AbstractSearchPredicate;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
-import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
-import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
 import org.apache.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFrameFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexFileManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
-import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 import org.apache.hyracks.storage.am.lsm.common.freepage.VirtualFreePageManager;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
-import org.apache.hyracks.storage.am.lsm.common.impls.BlockingIOOperationCallbackWrapper;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexOperationContext;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFilterManager;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor.ICursorFactory;
 import org.apache.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
 import org.apache.hyracks.storage.common.IIndexAccessor;
 import org.apache.hyracks.storage.common.IIndexBulkLoader;
@@ -77,12 +73,12 @@ import org.apache.hyracks.storage.common.IIndexCursor;
 import org.apache.hyracks.storage.common.IModificationOperationCallback;
 import org.apache.hyracks.storage.common.ISearchOperationCallback;
 import org.apache.hyracks.storage.common.ISearchPredicate;
-import org.apache.hyracks.storage.common.MultiComparator;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.file.IFileMapProvider;
 
 public class LSMBTree extends AbstractLSMIndex implements ITreeIndex {
 
+    private static final ICursorFactory cursorFactory = opCtx -> new LSMBTreeSearchCursor(opCtx);
     // For creating BTree's used in flush and merge.
     protected final LSMBTreeDiskComponentFactory componentFactory;
     // For creating BTree's used in bulk load. Different from diskBTreeFactory
@@ -95,7 +91,7 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex {
     protected final IBinaryComparatorFactory[] cmpFactories;
 
     private final boolean needKeyDupCheck;
-    private final int[] btreeFields;
+
     // Primary LSMBTree has a Bloomfilter, but Secondary one doesn't have.
     private final boolean hasBloomFilter;
 
@@ -111,7 +107,7 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex {
             int[] btreeFields, int[] filterFields, boolean durable) throws HyracksDataException {
         super(ioManager, virtualBufferCaches, diskBTreeFactory.getBufferCache(), fileManager, diskFileMapProvider,
                 bloomFilterFalsePositiveRate, mergePolicy, opTracker, ioScheduler, ioOpCallback, filterFrameFactory,
-                filterManager, filterFields, durable);
+                filterManager, filterFields, durable, filterFactory, btreeFields);
         this.insertLeafFrameFactory = insertLeafFrameFactory;
         this.deleteLeafFrameFactory = deleteLeafFrameFactory;
         this.cmpFactories = cmpFactories;
@@ -131,7 +127,6 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex {
         bulkLoadComponentFactory =
                 new LSMBTreeDiskComponentFactory(bulkLoadBTreeFactory, bloomFilterFactory, filterFactory);
         this.needKeyDupCheck = needKeyDupCheck;
-        this.btreeFields = btreeFields;
         this.hasBloomFilter = needKeyDupCheck;
     }
 
@@ -151,172 +146,62 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex {
         componentFactory = new LSMBTreeDiskComponentFactory(diskBTreeFactory, bloomFilterFactory, null);
         bulkLoadComponentFactory = new LSMBTreeDiskComponentFactory(bulkLoadBTreeFactory, bloomFilterFactory, null);
         this.needKeyDupCheck = needKeyDupCheck;
-        this.btreeFields = null;
         //TODO remove BloomFilter from external dataset's secondary LSMBTree index
         this.hasBloomFilter = true;
     }
 
-    @Override
-    public synchronized void create() throws HyracksDataException {
-        if (isActivated) {
-            throw new HyracksDataException("Failed to create the index since it is activated.");
-        }
-        // Why delete is part of the create??
-        fileManager.deleteDirs();
-        fileManager.createDirs();
-        diskComponents.clear();
+    public boolean hasBloomFilter() {
+        return hasBloomFilter;
     }
 
     @Override
-    public synchronized void activate() throws HyracksDataException {
-        if (isActivated) {
-            throw new HyracksDataException("Failed to activate the index since it is already activated.");
-        }
-        List<ILSMDiskComponent> immutableComponents = diskComponents;
-        immutableComponents.clear();
-        List<LSMComponentFileReferences> validFileReferences;
-        validFileReferences = fileManager.cleanupAndGetValidFiles();
-        for (LSMComponentFileReferences lsmComonentFileReference : validFileReferences) {
-            LSMBTreeDiskComponent component;
-            component = createDiskComponent(componentFactory, lsmComonentFileReference.getInsertIndexFileReference(),
-                    lsmComonentFileReference.getBloomFilterFileReference(), false);
-            immutableComponents.add(component);
-        }
-        isActivated = true;
+    public boolean isPrimaryIndex() {
+        return needKeyDupCheck;
     }
 
     @Override
-    public synchronized void deactivate(boolean flushOnExit) throws HyracksDataException {
-        if (!isActivated) {
-            throw new HyracksDataException("Failed to deactivate the index since it is already deactivated.");
-        }
-
-        if (flushOnExit) {
-            BlockingIOOperationCallbackWrapper cb = new BlockingIOOperationCallbackWrapper(ioOpCallback);
-            ILSMIndexAccessor accessor = createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
-            accessor.scheduleFlush(cb);
-            try {
-                cb.waitForIO();
-            } catch (InterruptedException e) {
-                throw new HyracksDataException(e);
-            }
-        }
-
-        List<ILSMDiskComponent> immutableComponents = diskComponents;
-        for (ILSMDiskComponent c : immutableComponents) {
-            LSMBTreeDiskComponent component = (LSMBTreeDiskComponent) c;
-            component.getBTree().deactivateCloseHandle();
-            if (hasBloomFilter) {
-                component.getBloomFilter().deactivate();
-            }
-        }
-        deallocateMemoryComponents();
-        isActivated = false;
+    public IBinaryComparatorFactory[] getComparatorFactories() {
+        return cmpFactories;
     }
 
     @Override
-    public synchronized void deactivate() throws HyracksDataException {
-        deactivate(true);
+    protected ILSMDiskComponent loadComponent(LSMComponentFileReferences lsmComonentFileReferences)
+            throws HyracksDataException {
+        return createDiskComponent(componentFactory, lsmComonentFileReferences.getInsertIndexFileReference(),
+                lsmComonentFileReferences.getBloomFilterFileReference(), false);
     }
 
     @Override
-    public void destroy() throws HyracksDataException {
-        if (isActivated) {
-            throw new HyracksDataException("Failed to destroy the index since it is activated.");
-        }
-
-        List<ILSMDiskComponent> immutableComponents = diskComponents;
-        for (ILSMDiskComponent c : immutableComponents) {
-            LSMBTreeDiskComponent component = (LSMBTreeDiskComponent) c;
-            component.getBTree().destroy();
-            if (hasBloomFilter) {
-                component.getBloomFilter().destroy();
-            }
-        }
-        for (ILSMComponent c : memoryComponents) {
-            LSMBTreeMemoryComponent mutableComponent = (LSMBTreeMemoryComponent) c;
-            mutableComponent.getBTree().destroy();
+    protected void destroyDiskComponent(ILSMDiskComponent c) throws HyracksDataException {
+        LSMBTreeDiskComponent component = (LSMBTreeDiskComponent) c;
+        component.getBTree().destroy();
+        if (hasBloomFilter) {
+            component.getBloomFilter().destroy();
         }
-        fileManager.deleteDirs();
     }
 
     @Override
-    public void clear() throws HyracksDataException {
-        if (!isActivated) {
-            throw new HyracksDataException("Failed to clear the index since it is not activated.");
-        }
-
-        clearMemoryComponents();
-        List<ILSMDiskComponent> immutableComponents = diskComponents;
-        for (ILSMComponent c : immutableComponents) {
-            LSMBTreeDiskComponent component = (LSMBTreeDiskComponent) c;
-            if (hasBloomFilter) {
-                component.getBloomFilter().deactivate();
-            }
-            component.getBTree().deactivate();
-            if (hasBloomFilter) {
-                component.getBloomFilter().destroy();
-            }
-            component.getBTree().destroy();
-        }
-        immutableComponents.clear();
+    protected void destroyMemoryComponent(ILSMMemoryComponent c) throws HyracksDataException {
+        LSMBTreeMemoryComponent mutableComponent = (LSMBTreeMemoryComponent) c;
+        mutableComponent.getBTree().destroy();
     }
 
     @Override
-    public void getOperationalComponents(ILSMIndexOperationContext ctx) throws HyracksDataException {
-        List<ILSMDiskComponent> immutableComponents = diskComponents;
-        List<ILSMComponent> operationalComponents = ctx.getComponentHolder();
-        int cmc = currentMutableComponentId.get();
-        ctx.setCurrentMutableComponentId(cmc);
-        operationalComponents.clear();
-        switch (ctx.getOperation()) {
-            case UPDATE:
-            case PHYSICALDELETE:
-            case FLUSH:
-            case DELETE:
-            case UPSERT:
-                operationalComponents.add(memoryComponents.get(cmc));
-                break;
-            case INSERT:
-                addOperationalMutableComponents(operationalComponents);
-                operationalComponents.addAll(immutableComponents);
-                break;
-            case SEARCH:
-                if (memoryComponentsAllocated) {
-                    addOperationalMutableComponents(operationalComponents);
-                }
-                if (filterManager != null) {
-                    for (ILSMComponent c : immutableComponents) {
-                        if (c.getLSMComponentFilter().satisfy(
-                                ((AbstractSearchPredicate) ctx.getSearchPredicate()).getMinFilterTuple(),
-                                ((AbstractSearchPredicate) ctx.getSearchPredicate()).getMaxFilterTuple(),
-                                ((LSMBTreeOpContext) ctx).getFilterCmp())) {
-                            operationalComponents.add(c);
-                        }
-                    }
-                } else {
-                    operationalComponents.addAll(immutableComponents);
-                }
-
-                break;
-            case MERGE:
-                operationalComponents.addAll(ctx.getComponentsToBeMerged());
-                break;
-            case FULL_MERGE:
-                operationalComponents.addAll(immutableComponents);
-                break;
-            case REPLICATE:
-                operationalComponents.addAll(ctx.getComponentsToBeReplicated());
-                break;
-            default:
-                throw new UnsupportedOperationException("Operation " + ctx.getOperation() + " not supported.");
+    protected void clearDiskComponent(ILSMDiskComponent c) throws HyracksDataException {
+        LSMBTreeDiskComponent component = (LSMBTreeDiskComponent) c;
+        if (hasBloomFilter) {
+            component.getBloomFilter().deactivate();
         }
+        component.getBTree().deactivate();
+        if (hasBloomFilter) {
+            component.getBloomFilter().destroy();
+        }
+        component.getBTree().destroy();
     }
 
     @Override
     public void modify(IIndexOperationContext ictx, ITupleReference tuple) throws HyracksDataException {
         LSMBTreeOpContext ctx = (LSMBTreeOpContext) ictx;
-
         ITupleReference indexTuple;
         if (ctx.getIndexTuple() != null) {
             ctx.getIndexTuple().reset(tuple);
@@ -336,11 +221,7 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex {
                 ctx.getCurrentMutableBTreeAccessor().upsert(indexTuple);
                 break;
         }
-        if (ctx.getFilterTuple() != null) {
-            ctx.getFilterTuple().reset(tuple);
-            memoryComponents.get(currentMutableComponentId.get()).getLSMComponentFilter().update(ctx.getFilterTuple(),
-                    ctx.getFilterCmp());
-        }
+        updateFilter(ctx, tuple);
     }
 
     private boolean insert(ITupleReference tuple, LSMBTreeOpContext ctx) throws HyracksDataException {
@@ -403,21 +284,6 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex {
     }
 
     @Override
-    public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
-            throws HyracksDataException {
-        ILSMComponent flushingComponent = ctx.getComponentHolder().get(0);
-        LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference();
-        LSMBTreeOpContext opCtx = createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
-        assert ctx.getComponentHolder().size() == 1;
-        opCtx.setOperation(IndexOperation.FLUSH);
-        opCtx.getComponentHolder().add(flushingComponent);
-        ILSMIndexAccessor flushAccessor = new LSMBTreeAccessor(lsmHarness, opCtx);
-        ioScheduler.scheduleOperation(new LSMBTreeFlushOperation(flushAccessor, flushingComponent,
-                componentFileRefs.getInsertIndexFileReference(), componentFileRefs.getBloomFilterFileReference(),
-                callback, fileManager.getBaseDir()));
-    }
-
-    @Override
     public ILSMDiskComponent flush(ILSMIOOperation operation) throws HyracksDataException {
         LSMBTreeFlushOperation flushOp = (LSMBTreeFlushOperation) operation;
         LSMBTreeMemoryComponent flushingComponent = (LSMBTreeMemoryComponent) flushOp.getFlushingComponent();
@@ -446,8 +312,8 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex {
             bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
         }
 
-        LSMBTreeDiskComponent component = createDiskComponent(componentFactory, flushOp.getBTreeFlushTarget(),
-                flushOp.getBloomFilterFlushTarget(), true);
+        LSMBTreeDiskComponent component =
+                createDiskComponent(componentFactory, flushOp.getTarget(), flushOp.getBloomFilterTarget(), true);
         IIndexBulkLoader bulkLoader = component.getBTree().createBulkLoader(1.0f, false, numElements, false);
         IIndexBulkLoader builder = null;
         if (hasBloomFilter) {
@@ -476,8 +342,8 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex {
             List<ITupleReference> filterTuples = new ArrayList<>();
             filterTuples.add(flushingComponent.getLSMComponentFilter().getMinTuple());
             filterTuples.add(flushingComponent.getLSMComponentFilter().getMaxTuple());
-            filterManager.updateFilter(component.getLSMComponentFilter(), filterTuples);
-            filterManager.writeFilter(component.getLSMComponentFilter(), component.getBTree());
+            getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples);
+            getFilterManager().writeFilter(component.getLSMComponentFilter(), component.getBTree());
         }
         // Write metadata from memory component to disk
         // Q. what about the merge operation? how do we resolve conflicts
@@ -492,33 +358,9 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex {
     }
 
     @Override
-    public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
-            throws HyracksDataException {
-        LSMBTreeOpContext opCtx = createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
-        opCtx.setOperation(IndexOperation.MERGE);
-        List<ILSMComponent> mergingComponents = ctx.getComponentHolder();
-        boolean returnDeletedTuples = false;
-        if (ctx.getComponentHolder().get(ctx.getComponentHolder().size() - 1) != diskComponents
-                .get(diskComponents.size() - 1)) {
-            returnDeletedTuples = true;
-        }
-        ITreeIndexCursor cursor = new LSMBTreeRangeSearchCursor(opCtx, returnDeletedTuples);
-        BTree firstBTree = ((LSMBTreeDiskComponent) mergingComponents.get(0)).getBTree();
-        BTree lastBTree = ((LSMBTreeDiskComponent) mergingComponents.get(mergingComponents.size() - 1)).getBTree();
-        FileReference firstFile = firstBTree.getFileReference();
-        FileReference lastFile = lastBTree.getFileReference();
-        LSMComponentFileReferences relMergeFileRefs =
-                fileManager.getRelMergeFileReference(firstFile.getFile().getName(), lastFile.getFile().getName());
-        ILSMIndexAccessor accessor = new LSMBTreeAccessor(lsmHarness, opCtx);
-        ioScheduler.scheduleOperation(new LSMBTreeMergeOperation(accessor, mergingComponents, cursor,
-                relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(),
-                callback, fileManager.getBaseDir()));
-    }
-
-    @Override
     public ILSMDiskComponent merge(ILSMIOOperation operation) throws HyracksDataException {
         LSMBTreeMergeOperation mergeOp = (LSMBTreeMergeOperation) operation;
-        ITreeIndexCursor cursor = mergeOp.getCursor();
+        IIndexCursor cursor = mergeOp.getCursor();
         RangePredicate rangePred = new RangePredicate(null, null, true, true, null, null);
         ILSMIndexOperationContext opCtx = ((LSMIndexSearchCursor) cursor).getOpCtx();
         opCtx.getComponentHolder().addAll(mergeOp.getMergingComponents());
@@ -535,8 +377,8 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex {
             int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElements);
             bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
         }
-        LSMBTreeDiskComponent mergedComponent = createDiskComponent(componentFactory, mergeOp.getBTreeMergeTarget(),
-                mergeOp.getBloomFilterMergeTarget(), true);
+        LSMBTreeDiskComponent mergedComponent =
+                createDiskComponent(componentFactory, mergeOp.getTarget(), mergeOp.getBloomFilterTarget(), true);
 
         IIndexBulkLoader bulkLoader = mergedComponent.getBTree().createBulkLoader(1.0f, false, numElements, false);
         IIndexBulkLoader builder = null;
@@ -565,8 +407,8 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex {
                 filterTuples.add(mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMinTuple());
                 filterTuples.add(mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMaxTuple());
             }
-            filterManager.updateFilter(mergedComponent.getLSMComponentFilter(), filterTuples);
-            filterManager.writeFilter(mergedComponent.getLSMComponentFilter(), mergedComponent.getBTree());
+            getFilterManager().updateFilter(mergedComponent.getLSMComponentFilter(), filterTuples);
+            getFilterManager().writeFilter(mergedComponent.getLSMComponentFilter(), mergedComponent.getBTree());
         }
         bulkLoader.end();
         return mergedComponent;
@@ -590,15 +432,15 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex {
             component.getBloomFilter().activate();
         }
         if (component.getLSMComponentFilter() != null && !createComponent) {
-            filterManager.readFilter(component.getLSMComponentFilter(), component.getBTree());
+            getFilterManager().readFilter(component.getLSMComponentFilter(), component.getBTree());
         }
         return component;
     }
 
     @Override
-    public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint,
-            boolean checkIfEmptyIndex) throws HyracksDataException {
-        return new LSMBTreeBulkLoader(fillLevel, verifyInput, numElementsHint, checkIfEmptyIndex);
+    public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint)
+            throws HyracksDataException {
+        return new LSMBTreeBulkLoader(this, fillLevel, verifyInput, numElementsHint);
     }
 
     protected ILSMDiskComponent createBulkLoadTarget() throws HyracksDataException {
@@ -618,168 +460,32 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex {
         markAsValidInternal(component.getBTree());
     }
 
-    public class LSMBTreeBulkLoader implements IIndexBulkLoader {
-        private final ILSMDiskComponent component;
-        private final BTreeBulkLoader bulkLoader;
-        private final IIndexBulkLoader builder;
-        private boolean cleanedUpArtifacts = false;
-        private boolean isEmptyComponent = true;
-        private boolean endedBloomFilterLoad = false;
-        public final PermutingTupleReference indexTuple;
-        public final PermutingTupleReference filterTuple;
-        public final MultiComparator filterCmp;
-
-        public LSMBTreeBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint,
-                boolean checkIfEmptyIndex) throws HyracksDataException {
-            if (checkIfEmptyIndex && !isEmptyIndex()) {
-                throw HyracksDataException.create(ErrorCode.LOAD_NON_EMPTY_INDEX);
-            }
-            component = createBulkLoadTarget();
-            bulkLoader = (BTreeBulkLoader) ((LSMBTreeDiskComponent) component).getBTree().createBulkLoader(fillFactor,
-                    verifyInput, numElementsHint, false);
-
-            if (hasBloomFilter) {
-                int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElementsHint);
-                BloomFilterSpecification bloomFilterSpec =
-                        BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
-                builder = ((LSMBTreeDiskComponent) component).getBloomFilter().createBuilder(numElementsHint,
-                        bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements());
-            } else {
-                builder = null;
-            }
-
-            if (filterFields != null) {
-                indexTuple = new PermutingTupleReference(btreeFields);
-                filterCmp = MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories());
-                filterTuple = new PermutingTupleReference(filterFields);
-            } else {
-                indexTuple = null;
-                filterCmp = null;
-                filterTuple = null;
-            }
-        }
-
-        @Override
-        public void add(ITupleReference tuple) throws HyracksDataException {
-            try {
-                ITupleReference t;
-                if (indexTuple != null) {
-                    indexTuple.reset(tuple);
-                    t = indexTuple;
-                } else {
-                    t = tuple;
-                }
-
-                bulkLoader.add(t);
-                if (hasBloomFilter) {
-                    builder.add(t);
-                }
-
-                if (filterTuple != null) {
-                    filterTuple.reset(tuple);
-                    component.getLSMComponentFilter().update(filterTuple, filterCmp);
-                }
-            } catch (Exception e) {
-                cleanupArtifacts();
-                throw e;
-            }
-            if (isEmptyComponent) {
-                isEmptyComponent = false;
-            }
-        }
-
-        protected void cleanupArtifacts() throws HyracksDataException {
-            if (!cleanedUpArtifacts) {
-                cleanedUpArtifacts = true;
-                if (hasBloomFilter && !endedBloomFilterLoad) {
-                    builder.abort();
-                    endedBloomFilterLoad = true;
-                }
-                ((LSMBTreeDiskComponent) component).getBTree().deactivate();
-                ((LSMBTreeDiskComponent) component).getBTree().destroy();
-                if (hasBloomFilter) {
-                    ((LSMBTreeDiskComponent) component).getBloomFilter().deactivate();
-                    ((LSMBTreeDiskComponent) component).getBloomFilter().destroy();
-                }
-            }
-        }
-
-        @Override
-        public void end() throws HyracksDataException {
-            if (!cleanedUpArtifacts) {
-                if (hasBloomFilter && !endedBloomFilterLoad) {
-                    builder.end();
-                    endedBloomFilterLoad = true;
-                }
-
-                if (component.getLSMComponentFilter() != null) {
-                    filterManager.writeFilter(component.getLSMComponentFilter(),
-                            ((LSMBTreeDiskComponent) component).getBTree());
-                }
-                bulkLoader.end();
-
-                if (isEmptyComponent) {
-                    cleanupArtifacts();
-                } else {
-                    //TODO(amoudi): Ensure Bulk load follow the same lifecycle Other Operations (Flush, Merge, etc).
-                    //then after operation should be called from harness as well
-                    //https://issues.apache.org/jira/browse/ASTERIXDB-1764
-                    ioOpCallback.afterOperation(LSMOperationType.FLUSH, null, component);
-                    lsmHarness.addBulkLoadedComponent(component);
-                }
-            }
-        }
-
-        @Override
-        public void abort() throws HyracksDataException {
-            if (bulkLoader != null) {
-                bulkLoader.abort();
-            }
-
-            if (builder != null) {
-                builder.abort();
-            }
-
-        }
+    @Override
+    protected ILSMIOOperation createFlushOperation(AbstractLSMIndexOperationContext opCtx,
+            ILSMMemoryComponent flushingComponent, LSMComponentFileReferences componentFileRefs,
+            ILSMIOOperationCallback callback) {
+        ILSMIndexAccessor accessor = createAccessor(opCtx);
+        return new LSMBTreeFlushOperation(accessor, flushingComponent, componentFileRefs.getInsertIndexFileReference(),
+                componentFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir());
     }
 
+    @Override
     public LSMBTreeOpContext createOpContext(IModificationOperationCallback modificationCallback,
             ISearchOperationCallback searchCallback) {
         int numBloomFilterKeyFields = hasBloomFilter ? componentFactory.getBloomFilterKeyFields().length : 0;
         return new LSMBTreeOpContext(memoryComponents, insertLeafFrameFactory, deleteLeafFrameFactory,
-                modificationCallback, searchCallback, numBloomFilterKeyFields, btreeFields, filterFields, lsmHarness);
+                modificationCallback, searchCallback, numBloomFilterKeyFields, getTreeFields(), getFilterFields(),
+                getLsmHarness(), getFilterCmpFactories());
     }
 
     @Override
     public ILSMIndexAccessor createAccessor(IModificationOperationCallback modificationCallback,
             ISearchOperationCallback searchCallback) {
-        return new LSMBTreeAccessor(lsmHarness, createOpContext(modificationCallback, searchCallback));
-    }
-
-    public class LSMBTreeAccessor extends LSMTreeIndexAccessor {
-        public LSMBTreeAccessor(ILSMHarness lsmHarness, ILSMIndexOperationContext ctx) {
-            super(lsmHarness, ctx);
-        }
-
-        @Override
-        public IIndexCursor createSearchCursor(boolean exclusive) {
-            return new LSMBTreeSearchCursor(ctx);
-        }
-
-        public MultiComparator getMultiComparator() {
-            LSMBTreeOpContext concreteCtx = (LSMBTreeOpContext) ctx;
-            return concreteCtx.getCmp();
-        }
-    }
-
-    @Override
-    public IBufferCache getBufferCache() {
-        return diskBufferCache;
+        return createAccessor(createOpContext(modificationCallback, searchCallback));
     }
 
-    @Override
-    public IBinaryComparatorFactory[] getComparatorFactories() {
-        return cmpFactories;
+    public ILSMIndexAccessor createAccessor(AbstractLSMIndexOperationContext opCtx) {
+        return new LSMTreeIndexAccessor(getLsmHarness(), opCtx, cursorFactory);
     }
 
     @Override
@@ -818,17 +524,6 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex {
     }
 
     @Override
-    public long getMemoryAllocationSize() {
-        long size = 0;
-        for (ILSMComponent c : memoryComponents) {
-            LSMBTreeMemoryComponent mutableComponent = (LSMBTreeMemoryComponent) c;
-            IBufferCache virtualBufferCache = mutableComponent.getBTree().getBufferCache();
-            size += virtualBufferCache.getNumPages() * virtualBufferCache.getPageSize();
-        }
-        return size;
-    }
-
-    @Override
     public int getRootPageId() {
         LSMBTreeMemoryComponent mutableComponent =
                 (LSMBTreeMemoryComponent) memoryComponents.get(currentMutableComponentId.get());
@@ -836,13 +531,10 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex {
     }
 
     @Override
-    public void validate() throws HyracksDataException {
-        validateMemoryComponents();
-        List<ILSMDiskComponent> immutableComponents = diskComponents;
-        for (ILSMDiskComponent c : immutableComponents) {
-            BTree btree = ((LSMBTreeDiskComponent) c).getBTree();
-            btree.validate();
-        }
+    protected long getMemoryComponentSize(ILSMMemoryComponent c) {
+        LSMBTreeMemoryComponent mutableComponent = (LSMBTreeMemoryComponent) c;
+        IBufferCache virtualBufferCache = mutableComponent.getBTree().getBufferCache();
+        return virtualBufferCache.getNumPages() * (long) virtualBufferCache.getPageSize();
     }
 
     @Override
@@ -851,11 +543,6 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex {
     }
 
     @Override
-    public boolean isPrimaryIndex() {
-        return needKeyDupCheck;
-    }
-
-    @Override
     public Set<String> getLSMComponentPhysicalFiles(ILSMComponent lsmComponent) {
         Set<String> files = new HashSet<>();
         LSMBTreeDiskComponent component = (LSMBTreeDiskComponent) lsmComponent;
@@ -867,65 +554,71 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex {
     }
 
     @Override
-    public synchronized void allocateMemoryComponents() throws HyracksDataException {
-        if (!isActivated) {
-            throw new HyracksDataException("Failed to allocate memory components since the index is not active");
-        }
-        if (memoryComponentsAllocated) {
-            return;
-        }
-        for (ILSMComponent c : memoryComponents) {
-            LSMBTreeMemoryComponent mutableComponent = (LSMBTreeMemoryComponent) c;
-            ((IVirtualBufferCache) mutableComponent.getBTree().getBufferCache()).open();
-            mutableComponent.getBTree().create();
-            mutableComponent.getBTree().activate();
-        }
-        memoryComponentsAllocated = true;
-    }
-
-    private void addOperationalMutableComponents(List<ILSMComponent> operationalComponents) {
-        int cmc = currentMutableComponentId.get();
-        int numMutableComponents = memoryComponents.size();
-        for (int i = 0; i < numMutableComponents - 1; i++) {
-            ILSMComponent c = memoryComponents.get((cmc + i + 1) % numMutableComponents);
-            LSMBTreeMemoryComponent mutableComponent = (LSMBTreeMemoryComponent) c;
-            if (mutableComponent.isReadable()) {
-                // Make sure newest components are added first
-                operationalComponents.add(0, mutableComponent);
-            }
-        }
-        // The current mutable component is always added
-        operationalComponents.add(0, memoryComponents.get(cmc));
+    protected void clearMemoryComponent(ILSMMemoryComponent c) throws HyracksDataException {
+        LSMBTreeMemoryComponent mutableComponent = (LSMBTreeMemoryComponent) c;
+        mutableComponent.getBTree().clear();
+        mutableComponent.reset();
     }
 
-    private synchronized void clearMemoryComponents() throws HyracksDataException {
-        if (memoryComponentsAllocated) {
-            for (ILSMComponent c : memoryComponents) {
-                LSMBTreeMemoryComponent mutableComponent = (LSMBTreeMemoryComponent) c;
-                mutableComponent.getBTree().clear();
-                mutableComponent.reset();
-            }
-        }
+    @Override
+    protected void validateMemoryComponent(ILSMMemoryComponent c) throws HyracksDataException {
+        LSMBTreeMemoryComponent mutableComponent = (LSMBTreeMemoryComponent) c;
+        mutableComponent.getBTree().validate();
     }
 
-    private synchronized void validateMemoryComponents() throws HyracksDataException {
-        if (memoryComponentsAllocated) {
-            for (ILSMComponent c : memoryComponents) {
-                LSMBTreeMemoryComponent mutableComponent = (LSMBTreeMemoryComponent) c;
-                mutableComponent.getBTree().validate();
-            }
+    @Override
+    protected void validateDiskComponent(ILSMDiskComponent c) throws HyracksDataException {
+        BTree btree = ((LSMBTreeDiskComponent) c).getBTree();
+        btree.validate();
+    }
+
+    @Override
+    protected void deactivateDiskComponent(ILSMDiskComponent c) throws HyracksDataException {
+        LSMBTreeDiskComponent component = (LSMBTreeDiskComponent) c;
+        component.getBTree().deactivateCloseHandle();
+        if (hasBloomFilter) {
+            component.getBloomFilter().deactivate();
         }
     }
 
-    private synchronized void deallocateMemoryComponents() throws HyracksDataException {
-        if (memoryComponentsAllocated) {
-            for (ILSMComponent c : memoryComponents) {
-                LSMBTreeMemoryComponent mutableComponent = (LSMBTreeMemoryComponent) c;
-                mutableComponent.getBTree().deactivate();
-                mutableComponent.getBTree().destroy();
-                ((IVirtualBufferCache) mutableComponent.getBTree().getBufferCache()).close();
-            }
-            memoryComponentsAllocated = false;
+    @Override
+    protected void deactivateMemoryComponent(ILSMMemoryComponent c) throws HyracksDataException {
+        LSMBTreeMemoryComponent mutableComponent = (LSMBTreeMemoryComponent) c;
+        mutableComponent.getBTree().deactivate();
+        mutableComponent.getBTree().destroy();
+        ((IVirtualBufferCache) mutableComponent.getBTree().getBufferCache()).close();
+    }
+
+    @Override
+    protected void allocateMemoryComponent(ILSMMemoryComponent c) throws HyracksDataException {
+        LSMBTreeMemoryComponent mutableComponent = (LSMBTreeMemoryComponent) c;
+        ((IVirtualBufferCache) mutableComponent.getBTree().getBufferCache()).open();
+        mutableComponent.getBTree().create();
+        mutableComponent.getBTree().activate();
+    }
+
+    @Override
+    protected LSMComponentFileReferences getMergeFileReferences(ILSMDiskComponent firstComponent,
+            ILSMDiskComponent lastComponent) throws HyracksDataException {
+        BTree firstBTree = ((LSMBTreeDiskComponent) firstComponent).getBTree();
+        BTree lastBTree = ((LSMBTreeDiskComponent) lastComponent).getBTree();
+        FileReference firstFile = firstBTree.getFileReference();
+        FileReference lastFile = lastBTree.getFileReference();
+        return fileManager.getRelMergeFileReference(firstFile.getFile().getName(), lastFile.getFile().getName());
+    }
+
+    @Override
+    protected ILSMIOOperation createMergeOperation(AbstractLSMIndexOperationContext opCtx,
+            List<ILSMComponent> mergingComponents, LSMComponentFileReferences mergeFileRefs,
+            ILSMIOOperationCallback callback) {
+        boolean returnDeletedTuples = false;
+        ILSMIndexAccessor accessor = createAccessor(opCtx);
+        if (mergingComponents.get(mergingComponents.size() - 1) != diskComponents.get(diskComponents.size() - 1)) {
+            returnDeletedTuples = true;
         }
+        ITreeIndexCursor cursor = new LSMBTreeRangeSearchCursor(opCtx, returnDeletedTuples);
+        return new LSMBTreeMergeOperation(accessor, mergingComponents, cursor,
+                mergeFileRefs.getInsertIndexFileReference(), mergeFileRefs.getBloomFilterFileReference(), callback,
+                fileManager.getBaseDir());
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/639fe8cb/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeBulkLoader.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeBulkLoader.java
new file mode 100644
index 0000000..247b3de
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeBulkLoader.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.impls;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomCalculations;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
+import org.apache.hyracks.storage.am.btree.impls.BTree.BTreeBulkLoader;
+import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.common.IIndexBulkLoader;
+import org.apache.hyracks.storage.common.MultiComparator;
+
+public class LSMBTreeBulkLoader implements IIndexBulkLoader {
+    private final LSMBTree lsmIndex;
+    private final ILSMDiskComponent component;
+    private final BTreeBulkLoader bulkLoader;
+    private final IIndexBulkLoader builder;
+    private boolean cleanedUpArtifacts = false;
+    private boolean isEmptyComponent = true;
+    private boolean endedBloomFilterLoad = false;
+    public final PermutingTupleReference indexTuple;
+    public final PermutingTupleReference filterTuple;
+    public final MultiComparator filterCmp;
+
+    public LSMBTreeBulkLoader(LSMBTree lsmIndex, float fillFactor, boolean verifyInput, long numElementsHint)
+            throws HyracksDataException {
+        this.lsmIndex = lsmIndex;
+        component = lsmIndex.createBulkLoadTarget();
+        bulkLoader = (BTreeBulkLoader) ((LSMBTreeDiskComponent) component).getBTree().createBulkLoader(fillFactor,
+                verifyInput, numElementsHint, false);
+
+        if (lsmIndex.hasBloomFilter()) {
+            int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElementsHint);
+            BloomFilterSpecification bloomFilterSpec =
+                    BloomCalculations.computeBloomSpec(maxBucketsPerElement, lsmIndex.bloomFilterFalsePositiveRate());
+            builder = ((LSMBTreeDiskComponent) component).getBloomFilter().createBuilder(numElementsHint,
+                    bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements());
+        } else {
+            builder = null;
+        }
+
+        if (lsmIndex.getFilterFields() != null) {
+            indexTuple = new PermutingTupleReference(lsmIndex.getTreeFields());
+            filterCmp = MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories());
+            filterTuple = new PermutingTupleReference(lsmIndex.getFilterFields());
+        } else {
+            indexTuple = null;
+            filterCmp = null;
+            filterTuple = null;
+        }
+    }
+
+    @Override
+    public void add(ITupleReference tuple) throws HyracksDataException {
+        try {
+            ITupleReference t;
+            if (indexTuple != null) {
+                indexTuple.reset(tuple);
+                t = indexTuple;
+            } else {
+                t = tuple;
+            }
+
+            bulkLoader.add(t);
+            if (lsmIndex.hasBloomFilter()) {
+                builder.add(t);
+            }
+
+            if (filterTuple != null) {
+                filterTuple.reset(tuple);
+                component.getLSMComponentFilter().update(filterTuple, filterCmp);
+            }
+        } catch (Exception e) {
+            cleanupArtifacts();
+            throw e;
+        }
+        if (isEmptyComponent) {
+            isEmptyComponent = false;
+        }
+    }
+
+    private void cleanupArtifacts() throws HyracksDataException {
+        if (!cleanedUpArtifacts) {
+            cleanedUpArtifacts = true;
+            if (lsmIndex.hasBloomFilter() && !endedBloomFilterLoad) {
+                builder.abort();
+                endedBloomFilterLoad = true;
+            }
+            ((LSMBTreeDiskComponent) component).getBTree().deactivate();
+            ((LSMBTreeDiskComponent) component).getBTree().destroy();
+            if (lsmIndex.hasBloomFilter()) {
+                ((LSMBTreeDiskComponent) component).getBloomFilter().deactivate();
+                ((LSMBTreeDiskComponent) component).getBloomFilter().destroy();
+            }
+        }
+    }
+
+    @Override
+    public void end() throws HyracksDataException {
+        if (!cleanedUpArtifacts) {
+            if (lsmIndex.hasBloomFilter() && !endedBloomFilterLoad) {
+                builder.end();
+                endedBloomFilterLoad = true;
+            }
+
+            if (component.getLSMComponentFilter() != null) {
+                lsmIndex.getFilterManager().writeFilter(component.getLSMComponentFilter(),
+                        ((LSMBTreeDiskComponent) component).getBTree());
+            }
+            bulkLoader.end();
+
+            if (isEmptyComponent) {
+                cleanupArtifacts();
+            } else {
+                //TODO(amoudi): Ensure Bulk load follow the same lifecycle Other Operations (Flush, Merge, etc).
+                //then after operation should be called from harness as well
+                //https://issues.apache.org/jira/browse/ASTERIXDB-1764
+                lsmIndex.getIOOperationCallback().afterOperation(LSMOperationType.FLUSH, null, component);
+                lsmIndex.getLsmHarness().addBulkLoadedComponent(component);
+            }
+        }
+    }
+
+    @Override
+    public void abort() throws HyracksDataException {
+        if (bulkLoader != null) {
+            bulkLoader.abort();
+        }
+
+        if (builder != null) {
+            builder.abort();
+        }
+
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/639fe8cb/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
index c57c35f..4a06778 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
@@ -18,92 +18,23 @@
  */
 package org.apache.hyracks.storage.am.lsm.btree.impls;
 
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.io.IODeviceHandle;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
+import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
 
-public class LSMBTreeFlushOperation implements ILSMIOOperation, Comparable<LSMBTreeFlushOperation> {
-
-    private final ILSMIndexAccessor accessor;
-    private final ILSMComponent flushingComponent;
-    private final FileReference btreeFlushTarget;
+public class LSMBTreeFlushOperation extends FlushOperation {
     private final FileReference bloomFilterFlushTarget;
-    private final ILSMIOOperationCallback callback;
-    private final String indexIdentifier;
 
-    public LSMBTreeFlushOperation(ILSMIndexAccessor accessor, ILSMComponent flushingComponent,
-            FileReference btreeFlushTarget, FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback,
+    public LSMBTreeFlushOperation(ILSMIndexAccessor accessor, ILSMMemoryComponent flushingComponent,
+            FileReference flushTarget, FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback,
             String indexIdentifier) {
-        this.accessor = accessor;
-        this.flushingComponent = flushingComponent;
-        this.btreeFlushTarget = btreeFlushTarget;
+        super(accessor, flushingComponent, flushTarget, callback, indexIdentifier);
         this.bloomFilterFlushTarget = bloomFilterFlushTarget;
-        this.callback = callback;
-        this.indexIdentifier = indexIdentifier;
-    }
-
-    @Override
-    public Set<IODeviceHandle> getReadDevices() {
-        return Collections.emptySet();
-    }
-
-    @Override
-    public Set<IODeviceHandle> getWriteDevices() {
-        Set<IODeviceHandle> devs = new HashSet<>();
-        devs.add(btreeFlushTarget.getDeviceHandle());
-        if (bloomFilterFlushTarget != null) {
-            devs.add(bloomFilterFlushTarget.getDeviceHandle());
-        }
-        return devs;
-    }
-
-    @Override
-    public Boolean call() throws HyracksDataException {
-        accessor.flush(this);
-        return true;
-    }
-
-    @Override
-    public ILSMIOOperationCallback getCallback() {
-        return callback;
-    }
-
-    public FileReference getBTreeFlushTarget() {
-        return btreeFlushTarget;
     }
 
-    public FileReference getBloomFilterFlushTarget() {
+    public FileReference getBloomFilterTarget() {
         return bloomFilterFlushTarget;
     }
-
-    public ILSMIndexAccessor getAccessor() {
-        return accessor;
-    }
-
-    public ILSMComponent getFlushingComponent() {
-        return flushingComponent;
-    }
-
-    @Override
-    public String getIndexUniqueIdentifier() {
-        return indexIdentifier;
-    }
-
-    @Override
-    public LSMIOOpertionType getIOOpertionType() {
-        return LSMIOOpertionType.FLUSH;
-    }
-
-    @Override
-    public int compareTo(LSMBTreeFlushOperation o) {
-        return btreeFlushTarget.getFile().getName().compareTo(o.getBTreeFlushTarget().getFile().getName());
-    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/639fe8cb/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
index fbf6c5c..0cc76f2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
@@ -19,98 +19,27 @@
 
 package org.apache.hyracks.storage.am.lsm.btree.impls;
 
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 
-import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.io.IODeviceHandle;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation;
 
-public class LSMBTreeMergeOperation implements ILSMIOOperation {
+public class LSMBTreeMergeOperation extends MergeOperation {
 
-    private final ILSMIndexAccessor accessor;
-    private final List<ILSMComponent> mergingComponents;
-    private final ITreeIndexCursor cursor;
-    private final FileReference btreeMergeTarget;
     private final FileReference bloomFilterMergeTarget;
-    private final ILSMIOOperationCallback callback;
-    private final String indexIdentifier;
 
     public LSMBTreeMergeOperation(ILSMIndexAccessor accessor, List<ILSMComponent> mergingComponents,
-            ITreeIndexCursor cursor, FileReference btreeMergeTarget, FileReference bloomFilterMergeTarget,
+            ITreeIndexCursor cursor, FileReference target, FileReference bloomFilterMergeTarget,
             ILSMIOOperationCallback callback, String indexIdentifier) {
-        this.accessor = accessor;
-        this.mergingComponents = mergingComponents;
-        this.cursor = cursor;
-        this.btreeMergeTarget = btreeMergeTarget;
+        super(accessor, target, callback, indexIdentifier, mergingComponents, cursor);
         this.bloomFilterMergeTarget = bloomFilterMergeTarget;
-        this.callback = callback;
-        this.indexIdentifier = indexIdentifier;
     }
 
-    @Override
-    public Set<IODeviceHandle> getReadDevices() {
-        Set<IODeviceHandle> devs = new HashSet<>();
-        for (ILSMComponent o : mergingComponents) {
-            LSMBTreeDiskComponent component = (LSMBTreeDiskComponent) o;
-            devs.add(component.getBTree().getFileReference().getDeviceHandle());
-            if (bloomFilterMergeTarget != null) {
-                devs.add(component.getBloomFilter().getFileReference().getDeviceHandle());
-            }
-        }
-        return devs;
-    }
-
-    @Override
-    public Set<IODeviceHandle> getWriteDevices() {
-        Set<IODeviceHandle> devs = new HashSet<>();
-        devs.add(btreeMergeTarget.getDeviceHandle());
-        if (bloomFilterMergeTarget != null) {
-            devs.add(bloomFilterMergeTarget.getDeviceHandle());
-        }
-        return devs;
-    }
-
-    @Override
-    public Boolean call() throws HyracksDataException {
-        accessor.merge(this);
-        return true;
-    }
-
-    @Override
-    public ILSMIOOperationCallback getCallback() {
-        return callback;
-    }
-
-    public FileReference getBTreeMergeTarget() {
-        return btreeMergeTarget;
-    }
-
-    public FileReference getBloomFilterMergeTarget() {
+    public FileReference getBloomFilterTarget() {
         return bloomFilterMergeTarget;
     }
-
-    public ITreeIndexCursor getCursor() {
-        return cursor;
-    }
-
-    public List<ILSMComponent> getMergingComponents() {
-        return mergingComponents;
-    }
-
-    @Override
-    public String getIndexUniqueIdentifier() {
-        return indexIdentifier;
-    }
-
-    @Override
-    public LSMIOOpertionType getIOOpertionType() {
-        return LSMIOOpertionType.MERGE;
-    }
 }


Mime
View raw message