asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Luo Chen (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: Add LSM Disk Component Bulk Loader
Date Tue, 23 May 2017 23:01:35 GMT
Luo Chen has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1773

Change subject: Add LSM Disk Component Bulk Loader
......................................................................

Add LSM Disk Component Bulk Loader

-Added ILSMDiskComponentBulkLoader interface, which is used to bulk
load an LSMDiskComponent with anti-matters
-Added ILSMDiskComponentWithBuddyBTreeBulkLoader, which is used to
bulk load an LSMDiskComponent with deleted-keys btrees
-Refactored LSM flush/merge/index bulk load operations to use
the LSMDiskComponentBulkLoader

Change-Id: I772a6d68761fcbb85982a1c9f72f2d186e1d1ffb
---
M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
A hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentBulkLoader.java
A hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentBulkLoader.java
A hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentWithBuddyBTreeBulkLoader.java
A hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentBulkLoader.java
A hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentWithBuddyBTreeBulkLoader.java
M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
A hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponentBulkLoader.java
M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
A hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponentBulkLoader.java
M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
A hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader.java
12 files changed, 873 insertions(+), 505 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/73/1773/1

diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 1c99d5a..d9e4da3 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -36,7 +36,6 @@
 import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
 import org.apache.hyracks.storage.am.btree.impls.BTree;
 import org.apache.hyracks.storage.am.btree.impls.BTree.BTreeAccessor;
-import org.apache.hyracks.storage.am.btree.impls.BTree.BTreeBulkLoader;
 import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
 import org.apache.hyracks.storage.am.common.api.IIndexOperationContext;
 import org.apache.hyracks.storage.am.common.api.IPageManager;
@@ -46,12 +45,12 @@
 import org.apache.hyracks.storage.am.common.impls.AbstractSearchPredicate;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
-import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
 import org.apache.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFrameFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
@@ -426,12 +425,10 @@
 
         RangePredicate nullPred = new RangePredicate(null, null, true, true, null, null);
         long numElements = 0L;
-        BloomFilterSpecification bloomFilterSpec = null;
         if (hasBloomFilter) {
             //count elements in btree for creating Bloomfilter
             IIndexCursor countingCursor = ((BTreeAccessor) accessor).createCountingSearchCursor();
             accessor.search(countingCursor, nullPred);
-
             try {
                 while (countingCursor.hasNext()) {
                     countingCursor.next();
@@ -441,35 +438,22 @@
             } finally {
                 countingCursor.close();
             }
-
-            int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElements);
-            bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
         }
 
         LSMBTreeDiskComponent component = createDiskComponent(componentFactory, flushOp.getBTreeFlushTarget(),
                 flushOp.getBloomFilterFlushTarget(), true);
-        IIndexBulkLoader bulkLoader = component.getBTree().createBulkLoader(1.0f, false, numElements, false);
-        IIndexBulkLoader builder = null;
-        if (hasBloomFilter) {
-            builder = component.getBloomFilter().createBuilder(numElements, bloomFilterSpec.getNumHashes(),
-                    bloomFilterSpec.getNumBucketsPerElements());
-        }
+        ILSMDiskComponentBulkLoader componentBulkLoader =
+                createComponentBulkLoader(component, 1.0f, false, numElements, false, false);
 
         IIndexCursor scanCursor = accessor.createSearchCursor(false);
         accessor.search(scanCursor, nullPred);
         try {
             while (scanCursor.hasNext()) {
                 scanCursor.next();
-                if (hasBloomFilter) {
-                    builder.add(scanCursor.getTuple());
-                }
-                bulkLoader.add(scanCursor.getTuple());
+                componentBulkLoader.add(scanCursor.getTuple());
             }
         } finally {
             scanCursor.close();
-            if (hasBloomFilter) {
-                builder.end();
-            }
         }
 
         if (component.getLSMComponentFilter() != null) {
@@ -487,7 +471,8 @@
         // TODO This code should be in the callback and not in the index
         flushingComponent.getMetadata().copy(component.getMetadata());
 
-        bulkLoader.end();
+        componentBulkLoader.end();
+
         return component;
     }
 
@@ -526,38 +511,25 @@
         List<ILSMComponent> mergedComponents = mergeOp.getMergingComponents();
 
         long numElements = 0L;
-        BloomFilterSpecification bloomFilterSpec = null;
         if (hasBloomFilter) {
             //count elements in btree for creating Bloomfilter
             for (int i = 0; i < mergedComponents.size(); ++i) {
                 numElements += ((LSMBTreeDiskComponent) mergedComponents.get(i)).getBloomFilter().getNumElements();
             }
-            int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElements);
-            bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
         }
         LSMBTreeDiskComponent mergedComponent = createDiskComponent(componentFactory, mergeOp.getBTreeMergeTarget(),
                 mergeOp.getBloomFilterMergeTarget(), true);
 
-        IIndexBulkLoader bulkLoader = mergedComponent.getBTree().createBulkLoader(1.0f, false, numElements, false);
-        IIndexBulkLoader builder = null;
-        if (hasBloomFilter) {
-            builder = mergedComponent.getBloomFilter().createBuilder(numElements, bloomFilterSpec.getNumHashes(),
-                    bloomFilterSpec.getNumBucketsPerElements());
-        }
+        ILSMDiskComponentBulkLoader componentBulkLoader =
+                createComponentBulkLoader(mergedComponent, 1.0f, false, numElements, false, false);
         try {
             while (cursor.hasNext()) {
                 cursor.next();
                 ITupleReference frameTuple = cursor.getTuple();
-                if (hasBloomFilter) {
-                    builder.add(frameTuple);
-                }
-                bulkLoader.add(frameTuple);
+                componentBulkLoader.add(frameTuple);
             }
         } finally {
             cursor.close();
-            if (hasBloomFilter) {
-                builder.end();
-            }
         }
         if (mergedComponent.getLSMComponentFilter() != null) {
             List<ITupleReference> filterTuples = new ArrayList<>();
@@ -568,7 +540,9 @@
             filterManager.updateFilter(mergedComponent.getLSMComponentFilter(), filterTuples);
             filterManager.writeFilter(mergedComponent.getLSMComponentFilter(), mergedComponent.getBTree());
         }
-        bulkLoader.end();
+
+        componentBulkLoader.end();
+
         return mergedComponent;
     }
 
@@ -593,6 +567,26 @@
             filterManager.readFilter(component.getLSMComponentFilter(), component.getBTree());
         }
         return component;
+    }
+
+    public ILSMDiskComponentBulkLoader createComponentBulkLoader(LSMBTreeDiskComponent component, float fillFactor,
+            boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex, boolean withFilter)
+            throws HyracksDataException {
+        BloomFilterSpecification bloomFilterSpec = null;
+        if (hasBloomFilter) {
+            int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElementsHint);
+            bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
+        }
+
+        if (withFilter && filterFields != null) {
+            return new LSMBTreeDiskComponentBulkLoader(component, bloomFilterSpec, fillFactor, verifyInput,
+                    numElementsHint, checkIfEmptyIndex, filterManager, btreeFields, filterFields,
+                    MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories()));
+        } else {
+            return new LSMBTreeDiskComponentBulkLoader(component, bloomFilterSpec, fillFactor, verifyInput,
+                    numElementsHint, checkIfEmptyIndex);
+        }
+
     }
 
     @Override
@@ -620,14 +614,7 @@
 
     public class LSMBTreeBulkLoader implements IIndexBulkLoader {
         private final ILSMDiskComponent component;
-        private final BTreeBulkLoader bulkLoader;
-        private final IIndexBulkLoader builder;
-        private boolean cleanedUpArtifacts = false;
-        private boolean isEmptyComponent = true;
-        private boolean endedBloomFilterLoad = false;
-        public final PermutingTupleReference indexTuple;
-        public final PermutingTupleReference filterTuple;
-        public final MultiComparator filterCmp;
+        private final ILSMDiskComponentBulkLoader bulkLoader;
 
         public LSMBTreeBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint,
                 boolean checkIfEmptyIndex) throws HyracksDataException {
@@ -635,111 +622,30 @@
                 throw HyracksDataException.create(ErrorCode.LOAD_NON_EMPTY_INDEX);
             }
             component = createBulkLoadTarget();
-            bulkLoader = (BTreeBulkLoader) ((LSMBTreeDiskComponent) component).getBTree().createBulkLoader(fillFactor,
-                    verifyInput, numElementsHint, false);
-
-            if (hasBloomFilter) {
-                int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElementsHint);
-                BloomFilterSpecification bloomFilterSpec =
-                        BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
-                builder = ((LSMBTreeDiskComponent) component).getBloomFilter().createBuilder(numElementsHint,
-                        bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements());
-            } else {
-                builder = null;
-            }
-
-            if (filterFields != null) {
-                indexTuple = new PermutingTupleReference(btreeFields);
-                filterCmp = MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories());
-                filterTuple = new PermutingTupleReference(filterFields);
-            } else {
-                indexTuple = null;
-                filterCmp = null;
-                filterTuple = null;
-            }
+            bulkLoader = createComponentBulkLoader((LSMBTreeDiskComponent) component, fillFactor, verifyInput,
+                    numElementsHint, checkIfEmptyIndex, true);
         }
 
         @Override
         public void add(ITupleReference tuple) throws HyracksDataException {
-            try {
-                ITupleReference t;
-                if (indexTuple != null) {
-                    indexTuple.reset(tuple);
-                    t = indexTuple;
-                } else {
-                    t = tuple;
-                }
-
-                bulkLoader.add(t);
-                if (hasBloomFilter) {
-                    builder.add(t);
-                }
-
-                if (filterTuple != null) {
-                    filterTuple.reset(tuple);
-                    component.getLSMComponentFilter().update(filterTuple, filterCmp);
-                }
-            } catch (Exception e) {
-                cleanupArtifacts();
-                throw e;
-            }
-            if (isEmptyComponent) {
-                isEmptyComponent = false;
-            }
-        }
-
-        protected void cleanupArtifacts() throws HyracksDataException {
-            if (!cleanedUpArtifacts) {
-                cleanedUpArtifacts = true;
-                if (hasBloomFilter && !endedBloomFilterLoad) {
-                    builder.abort();
-                    endedBloomFilterLoad = true;
-                }
-                ((LSMBTreeDiskComponent) component).getBTree().deactivate();
-                ((LSMBTreeDiskComponent) component).getBTree().destroy();
-                if (hasBloomFilter) {
-                    ((LSMBTreeDiskComponent) component).getBloomFilter().deactivate();
-                    ((LSMBTreeDiskComponent) component).getBloomFilter().destroy();
-                }
-            }
+            bulkLoader.add(tuple);
         }
 
         @Override
         public void end() throws HyracksDataException {
-            if (!cleanedUpArtifacts) {
-                if (hasBloomFilter && !endedBloomFilterLoad) {
-                    builder.end();
-                    endedBloomFilterLoad = true;
-                }
-
-                if (component.getLSMComponentFilter() != null) {
-                    filterManager.writeFilter(component.getLSMComponentFilter(),
-                            ((LSMBTreeDiskComponent) component).getBTree());
-                }
-                bulkLoader.end();
-
-                if (isEmptyComponent) {
-                    cleanupArtifacts();
-                } else {
-                    //TODO(amoudi): Ensure Bulk load follow the same lifecycle Other Operations (Flush, Merge, etc).
-                    //then after operation should be called from harness as well
-                    //https://issues.apache.org/jira/browse/ASTERIXDB-1764
-                    ioOpCallback.afterOperation(LSMOperationType.FLUSH, null, component);
-                    lsmHarness.addBulkLoadedComponent(component);
-                }
+            bulkLoader.end();
+            if (component.getComponentSize() > 0) {
+                //TODO(amoudi): Ensure Bulk load follow the same lifecycle Other Operations (Flush, Merge, etc).
+                //then after operation should be called from harness as well
+                //https://issues.apache.org/jira/browse/ASTERIXDB-1764
+                ioOpCallback.afterOperation(LSMOperationType.FLUSH, null, component);
+                lsmHarness.addBulkLoadedComponent(component);
             }
         }
 
         @Override
         public void abort() throws HyracksDataException {
-            if (bulkLoader != null) {
-                bulkLoader.abort();
-            }
-
-            if (builder != null) {
-                builder.abort();
-            }
-
+            bulkLoader.end();
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentBulkLoader.java
new file mode 100644
index 0000000..a5720de
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentBulkLoader.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.impls;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponentBulkLoader;
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.MultiComparator;
+
+public class LSMBTreeDiskComponentBulkLoader extends AbstractLSMDiskComponentBulkLoader {
+
+    //with filter
+    public LSMBTreeDiskComponentBulkLoader(LSMBTreeDiskComponent component, BloomFilterSpecification bloomFilterSpec,
+            float fillFactor, boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex,
+            ILSMComponentFilterManager filterManager, int[] indexFields, int[] filterFields, MultiComparator filterCmp)
+            throws HyracksDataException {
+        super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, filterManager,
+                indexFields, filterFields, filterCmp);
+    }
+
+    //without filter
+    public LSMBTreeDiskComponentBulkLoader(LSMBTreeDiskComponent component, BloomFilterSpecification bloomFilterSpec,
+            float fillFactor, boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex)
+            throws HyracksDataException {
+        super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, null, null, null,
+                null);
+    }
+
+    @Override
+    protected BloomFilter getBloomFilter(ILSMDiskComponent component) {
+        return ((LSMBTreeDiskComponent) component).getBloomFilter();
+    }
+
+    @Override
+    protected IIndex getIndex(ILSMDiskComponent component) {
+        return ((LSMBTreeDiskComponent) component).getBTree();
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentBulkLoader.java
new file mode 100644
index 0000000..cdb3e73
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentBulkLoader.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public interface ILSMDiskComponentBulkLoader {
+    /**
+     * Append a tuple to the disk component in the context of a bulk load.
+     *
+     * @param tuple
+     *            Tuple to be inserted.
+     * @throws HyracksDataException
+     *             If the BufferCache throws while un/pinning or un/latching.
+     */
+    public void add(ITupleReference tuple) throws HyracksDataException;
+
+    /**
+     * Finalize the bulk loading operation in the given context.
+     *
+     * @throws HyracksDataException
+     *             If the BufferCache throws while un/pinning or un/latching.
+     */
+    public void end() throws HyracksDataException;
+
+    /**
+     * Release all resources held by this bulkloader, with no guarantee of
+     * persisted content.
+     */
+    void abort() throws HyracksDataException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentWithBuddyBTreeBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentWithBuddyBTreeBulkLoader.java
new file mode 100644
index 0000000..8ef79ed
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentWithBuddyBTreeBulkLoader.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public interface ILSMDiskComponentWithBuddyBTreeBulkLoader extends ILSMDiskComponentBulkLoader {
+    /**
+     * Append a deleted tuple to the disk component in the context of a bulk load.
+     *
+     * @param tuple
+     *            Tuple to be inserted.
+     * @throws HyracksDataException
+     *             If the BufferCache throws while un/pinning or un/latching.
+     */
+    public void addDeletedTuple(ITupleReference tuple) throws HyracksDataException;
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentBulkLoader.java
new file mode 100644
index 0000000..1893a28
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentBulkLoader.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
+import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.IIndexBulkLoader;
+import org.apache.hyracks.storage.common.MultiComparator;
+
+public abstract class AbstractLSMDiskComponentBulkLoader implements ILSMDiskComponentBulkLoader {
+    protected final ILSMDiskComponent component;
+
+    protected final IIndexBulkLoader indexBulkLoader;
+    protected final IIndexBulkLoader bloomFilterBuilder;
+
+    protected final ILSMComponentFilterManager filterManager;
+    protected final PermutingTupleReference indexTuple;
+    protected final PermutingTupleReference filterTuple;
+    protected final MultiComparator filterCmp;
+
+    protected boolean cleanedUpArtifacts = false;
+    protected boolean isEmptyComponent = true;
+    protected boolean endedBloomFilterLoad = false;
+
+    //with filter
+    public AbstractLSMDiskComponentBulkLoader(ILSMDiskComponent component, BloomFilterSpecification bloomFilterSpec,
+            float fillFactor, boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex,
+            ILSMComponentFilterManager filterManager, int[] indexFields, int[] filterFields, MultiComparator filterCmp)
+            throws HyracksDataException {
+        this.component = component;
+        this.indexBulkLoader =
+                getIndex(component).createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
+        if (bloomFilterSpec != null) {
+            this.bloomFilterBuilder = getBloomFilter(component).createBuilder(numElementsHint,
+                    bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements());
+        } else {
+            this.bloomFilterBuilder = null;
+        }
+        if (filterManager != null) {
+            this.filterManager = filterManager;
+            this.indexTuple = new PermutingTupleReference(indexFields);
+            this.filterTuple = new PermutingTupleReference(filterFields);
+            this.filterCmp = filterCmp;
+        } else {
+            this.filterManager = null;
+            this.indexTuple = null;
+            this.filterTuple = null;
+            this.filterCmp = null;
+        }
+    }
+
+    @Override
+    public void add(ITupleReference tuple) throws HyracksDataException {
+        try {
+            ITupleReference t;
+            if (indexTuple != null) {
+                indexTuple.reset(tuple);
+                t = indexTuple;
+            } else {
+                t = tuple;
+            }
+
+            indexBulkLoader.add(t);
+            if (bloomFilterBuilder != null) {
+                bloomFilterBuilder.add(t);
+            }
+
+            if (filterTuple != null) {
+                filterTuple.reset(tuple);
+                component.getLSMComponentFilter().update(filterTuple, filterCmp);
+            }
+        } catch (Exception e) {
+            cleanupArtifacts();
+            throw e;
+        }
+        if (isEmptyComponent) {
+            isEmptyComponent = false;
+        }
+    }
+
+    @Override
+    public void abort() throws HyracksDataException {
+        if (indexBulkLoader != null) {
+            indexBulkLoader.abort();
+        }
+        if (bloomFilterBuilder != null) {
+            bloomFilterBuilder.abort();
+        }
+
+    }
+
+    @Override
+    public void end() throws HyracksDataException {
+        if (!cleanedUpArtifacts) {
+            if (bloomFilterBuilder != null && !endedBloomFilterLoad) {
+                bloomFilterBuilder.end();
+                endedBloomFilterLoad = true;
+            }
+
+            //use filter
+            if (filterManager != null && component.getLSMComponentFilter() != null) {
+                filterManager.writeFilter(component.getLSMComponentFilter(), getTreeIndex(component));
+            }
+            indexBulkLoader.end();
+
+            if (isEmptyComponent) {
+                cleanupArtifacts();
+            }
+        }
+    }
+
+    protected void cleanupArtifacts() throws HyracksDataException {
+        if (!cleanedUpArtifacts) {
+            cleanedUpArtifacts = true;
+            if (bloomFilterBuilder != null && !endedBloomFilterLoad) {
+                bloomFilterBuilder.abort();
+                endedBloomFilterLoad = true;
+            }
+            getIndex(component).deactivate();
+            getIndex(component).destroy();
+            if (bloomFilterBuilder != null) {
+                getBloomFilter(component).deactivate();
+                getBloomFilter(component).destroy();
+            }
+        }
+    }
+
+    /**
+     * TreeIndex is used to hold the filter tuple values
+     *
+     * @param component
+     * @return
+     */
+    protected ITreeIndex getTreeIndex(ILSMDiskComponent component) {
+        return (ITreeIndex) getIndex(component);
+    }
+
+    protected abstract IIndex getIndex(ILSMDiskComponent component);
+
+    protected abstract BloomFilter getBloomFilter(ILSMDiskComponent component);
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentWithBuddyBTreeBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentWithBuddyBTreeBulkLoader.java
new file mode 100644
index 0000000..0cebced
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentWithBuddyBTreeBulkLoader.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
+import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentWithBuddyBTreeBulkLoader;
+import org.apache.hyracks.storage.common.IIndexBulkLoader;
+import org.apache.hyracks.storage.common.MultiComparator;
+
+public abstract class AbstractLSMDiskComponentWithBuddyBTreeBulkLoader extends AbstractLSMDiskComponentBulkLoader
+        implements ILSMDiskComponentWithBuddyBTreeBulkLoader {
+
+    protected final IIndexBulkLoader buddyBTreeBulkLoader;
+
+    //with filter
+    public AbstractLSMDiskComponentWithBuddyBTreeBulkLoader(ILSMDiskComponent component,
+            BloomFilterSpecification bloomFilterSpec, float fillFactor, boolean verifyInput, long numElementsHint,
+            boolean checkIfEmptyIndex, ILSMComponentFilterManager filterManager, int[] indexFields, int[] filterFields,
+            MultiComparator filterCmp) throws HyracksDataException {
+        super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, filterManager,
+                indexFields, filterFields, filterCmp);
+
+        // BuddyBTree must be created even if it could be empty,
+        // since without it the component is not considered as valid.
+        buddyBTreeBulkLoader =
+                getBuddyBTree(component).createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
+    }
+
+    @Override
+    public void add(ITupleReference tuple) throws HyracksDataException {
+        try {
+            ITupleReference t;
+            if (indexTuple != null) {
+                indexTuple.reset(tuple);
+                t = indexTuple;
+            } else {
+                t = tuple;
+            }
+
+            indexBulkLoader.add(t);
+
+            if (filterTuple != null) {
+                filterTuple.reset(tuple);
+                component.getLSMComponentFilter().update(filterTuple, filterCmp);
+            }
+        } catch (Exception e) {
+            cleanupArtifacts();
+            throw e;
+        }
+        if (isEmptyComponent) {
+            isEmptyComponent = false;
+        }
+    }
+
+    @Override
+    public void addDeletedTuple(ITupleReference tuple) throws HyracksDataException {
+        try {
+            buddyBTreeBulkLoader.add(tuple);
+        } catch (Exception e) {
+            cleanupArtifacts();
+            throw e;
+        }
+        if (isEmptyComponent) {
+            isEmptyComponent = false;
+        }
+    }
+
+    @Override
+    public void abort() throws HyracksDataException {
+        super.abort();
+        if (buddyBTreeBulkLoader != null) {
+            buddyBTreeBulkLoader.abort();
+        }
+    }
+
+    @Override
+    public void end() throws HyracksDataException {
+        if (!cleanedUpArtifacts) {
+            if (bloomFilterBuilder != null && !endedBloomFilterLoad) {
+                bloomFilterBuilder.end();
+                endedBloomFilterLoad = true;
+            }
+
+            //use filter
+            if (filterManager != null && component.getLSMComponentFilter() != null) {
+                filterManager.writeFilter(component.getLSMComponentFilter(), getTreeIndex(component));
+            }
+            indexBulkLoader.end();
+            buddyBTreeBulkLoader.end();
+
+            if (isEmptyComponent) {
+                cleanupArtifacts();
+            }
+        }
+    }
+
+    @Override
+    protected void cleanupArtifacts() throws HyracksDataException {
+        if (!cleanedUpArtifacts) {
+            cleanedUpArtifacts = true;
+            if (bloomFilterBuilder != null && !endedBloomFilterLoad) {
+                bloomFilterBuilder.abort();
+                endedBloomFilterLoad = true;
+            }
+            getIndex(component).deactivate();
+            getIndex(component).destroy();
+
+            getBuddyBTree(component).deactivate();
+            getBuddyBTree(component).destroy();
+
+            if (bloomFilterBuilder != null) {
+                getBloomFilter(component).deactivate();
+                getBloomFilter(component).destroy();
+            }
+        }
+    }
+
+    protected abstract ITreeIndex getBuddyBTree(ILSMDiskComponent component);
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index 3d7b716..c6178c7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -52,6 +52,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFrameFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentWithBuddyBTreeBulkLoader;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
@@ -465,39 +466,14 @@
         LSMInvertedIndexDiskComponent component =
                 createDiskInvIndexComponent(componentFactory, flushOp.getDictBTreeFlushTarget(),
                         flushOp.getDeletedKeysBTreeFlushTarget(), flushOp.getBloomFilterFlushTarget(), true);
-        IInvertedIndex diskInvertedIndex = component.getInvIndex();
 
         // Create a scan cursor on the BTree underlying the in-memory inverted index.
         LSMInvertedIndexMemoryComponent flushingComponent =
                 (LSMInvertedIndexMemoryComponent) flushOp.getFlushingComponent();
-        InMemoryInvertedIndexAccessor memInvIndexAccessor = (InMemoryInvertedIndexAccessor) flushingComponent
-                .getInvIndex().createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
-        BTreeAccessor memBTreeAccessor = memInvIndexAccessor.getBTreeAccessor();
+
         RangePredicate nullPred = new RangePredicate(null, null, true, true, null, null);
-        IIndexCursor scanCursor = memBTreeAccessor.createSearchCursor(false);
-        memBTreeAccessor.search(scanCursor, nullPred);
 
-        // Bulk load the disk inverted index from the in-memory inverted index.
-        IIndexBulkLoader invIndexBulkLoader = diskInvertedIndex.createBulkLoader(1.0f, false, 0L, false);
-        try {
-            while (scanCursor.hasNext()) {
-                scanCursor.next();
-                invIndexBulkLoader.add(scanCursor.getTuple());
-            }
-        } finally {
-            scanCursor.close();
-        }
-        if (component.getLSMComponentFilter() != null) {
-            List<ITupleReference> filterTuples = new ArrayList<>();
-            filterTuples.add(flushingComponent.getLSMComponentFilter().getMinTuple());
-            filterTuples.add(flushingComponent.getLSMComponentFilter().getMaxTuple());
-            filterManager.updateFilter(component.getLSMComponentFilter(), filterTuples);
-            filterManager.writeFilter(component.getLSMComponentFilter(),
-                    ((OnDiskInvertedIndex) component.getInvIndex()).getBTree());
-        }
-        flushingComponent.getMetadata().copy(component.getMetadata());
-        invIndexBulkLoader.end();
-
+        // Search the deleted keys BTree to calculate the number of elements for BloomFilter
         IIndexAccessor deletedKeysBTreeAccessor = flushingComponent.getDeletedKeysBTree()
                 .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
         IIndexCursor btreeCountingCursor = ((BTreeAccessor) deletedKeysBTreeAccessor).createCountingSearchCursor();
@@ -513,33 +489,49 @@
             btreeCountingCursor.close();
         }
 
-        int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numBTreeTuples);
-        BloomFilterSpecification bloomFilterSpec =
-                BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
-
-        // Create an BTree instance for the deleted keys.
-        BTree diskDeletedKeysBTree = component.getDeletedKeysBTree();
+        ILSMDiskComponentWithBuddyBTreeBulkLoader componentBulkLoader =
+                createComponentBulkLoader(component, true, 1.0f, false, numBTreeTuples, false, false);
 
         // Create a scan cursor on the deleted keys BTree underlying the in-memory inverted index.
         IIndexCursor deletedKeysScanCursor = deletedKeysBTreeAccessor.createSearchCursor(false);
         deletedKeysBTreeAccessor.search(deletedKeysScanCursor, nullPred);
 
-        // Bulk load the deleted-keys BTree.
-        IIndexBulkLoader deletedKeysBTreeBulkLoader = diskDeletedKeysBTree.createBulkLoader(1.0f, false, 0L, false);
-        IIndexBulkLoader builder = component.getBloomFilter().createBuilder(numBTreeTuples,
-                bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements());
-
         try {
             while (deletedKeysScanCursor.hasNext()) {
                 deletedKeysScanCursor.next();
-                deletedKeysBTreeBulkLoader.add(deletedKeysScanCursor.getTuple());
-                builder.add(deletedKeysScanCursor.getTuple());
+                componentBulkLoader.addDeletedTuple(deletedKeysScanCursor.getTuple());
             }
         } finally {
             deletedKeysScanCursor.close();
-            builder.end();
         }
-        deletedKeysBTreeBulkLoader.end();
+
+        // Scan the in-memory inverted index
+        InMemoryInvertedIndexAccessor memInvIndexAccessor = (InMemoryInvertedIndexAccessor) flushingComponent
+                .getInvIndex().createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+        BTreeAccessor memBTreeAccessor = memInvIndexAccessor.getBTreeAccessor();
+        IIndexCursor scanCursor = memBTreeAccessor.createSearchCursor(false);
+        memBTreeAccessor.search(scanCursor, nullPred);
+
+        // Bulk load the disk inverted index from the in-memory inverted index.
+        try {
+            while (scanCursor.hasNext()) {
+                scanCursor.next();
+                componentBulkLoader.add(scanCursor.getTuple());
+            }
+        } finally {
+            scanCursor.close();
+        }
+        if (component.getLSMComponentFilter() != null) {
+            List<ITupleReference> filterTuples = new ArrayList<>();
+            filterTuples.add(flushingComponent.getLSMComponentFilter().getMinTuple());
+            filterTuples.add(flushingComponent.getLSMComponentFilter().getMaxTuple());
+            filterManager.updateFilter(component.getLSMComponentFilter(), filterTuples);
+            filterManager.writeFilter(component.getLSMComponentFilter(),
+                    ((OnDiskInvertedIndex) component.getInvIndex()).getBTree());
+        }
+        flushingComponent.getMetadata().copy(component.getMetadata());
+
+        componentBulkLoader.end();
 
         return component;
     }
@@ -585,7 +577,7 @@
                 createDiskInvIndexComponent(componentFactory, mergeOp.getDictBTreeMergeTarget(),
                         mergeOp.getDeletedKeysBTreeMergeTarget(), mergeOp.getBloomFilterMergeTarget(), true);
 
-        IInvertedIndex mergedDiskInvertedIndex = component.getInvIndex();
+        ILSMDiskComponentWithBuddyBTreeBulkLoader componentBulkLoader;
 
         // In case we must keep the deleted-keys BTrees, then they must be merged *before* merging the inverted indexes so that
         // lsmHarness.endSearch() is called once when the inverted indexes have been merged.
@@ -597,44 +589,31 @@
                     new LSMInvertedIndexDeletedKeysBTreeMergeCursor(opCtx);
             search(opCtx, btreeCursor, mergePred);
 
-            BTree btree = component.getDeletedKeysBTree();
-            IIndexBulkLoader btreeBulkLoader = btree.createBulkLoader(1.0f, true, 0L, false);
-
             long numElements = 0L;
             for (int i = 0; i < mergeOp.getMergingComponents().size(); ++i) {
                 numElements += ((LSMInvertedIndexDiskComponent) mergeOp.getMergingComponents().get(i)).getBloomFilter()
                         .getNumElements();
             }
 
-            int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElements);
-            BloomFilterSpecification bloomFilterSpec =
-                    BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
-            IIndexBulkLoader builder = component.getBloomFilter().createBuilder(numElements,
-                    bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements());
+            componentBulkLoader = createComponentBulkLoader(component, true, 1.0f, false, numElements, false, false);
             try {
                 while (btreeCursor.hasNext()) {
                     btreeCursor.next();
                     ITupleReference tuple = btreeCursor.getTuple();
-                    btreeBulkLoader.add(tuple);
-                    builder.add(tuple);
+                    componentBulkLoader.addDeletedTuple(tuple);
                 }
             } finally {
                 btreeCursor.close();
-                builder.end();
             }
-            btreeBulkLoader.end();
         } else {
-            BTree btree = component.getDeletedKeysBTree();
-            IIndexBulkLoader btreeBulkLoader = btree.createBulkLoader(1.0f, true, 0L, false);
-            btreeBulkLoader.end();
+            componentBulkLoader = createComponentBulkLoader(component, false, 1.0f, false, 0L, false, false);
         }
 
-        IIndexBulkLoader invIndexBulkLoader = mergedDiskInvertedIndex.createBulkLoader(1.0f, true, 0L, false);
         try {
             while (cursor.hasNext()) {
                 cursor.next();
                 ITupleReference tuple = cursor.getTuple();
-                invIndexBulkLoader.add(tuple);
+                componentBulkLoader.add(tuple);
             }
         } finally {
             cursor.close();
@@ -655,9 +634,28 @@
             filterManager.writeFilter(component.getLSMComponentFilter(),
                     ((OnDiskInvertedIndex) component.getInvIndex()).getBTree());
         }
-        invIndexBulkLoader.end();
+
+        componentBulkLoader.end();
 
         return component;
+    }
+
+    public ILSMDiskComponentWithBuddyBTreeBulkLoader createComponentBulkLoader(LSMInvertedIndexDiskComponent component,
+            boolean hasBloomFilter, float fillFactor, boolean verifyInput, long numElementsHint,
+            boolean checkIfEmptyIndex, boolean withFilter) throws HyracksDataException {
+        BloomFilterSpecification bloomFilterSpec = null;
+        if (hasBloomFilter) {
+            int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElementsHint);
+            bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
+        }
+        if (withFilter && filterFields != null) {
+            return new LSMInvertedIndexDiskComponentBulkLoader(component, bloomFilterSpec, fillFactor, verifyInput,
+                    numElementsHint, checkIfEmptyIndex, filterManager, invertedIndexFields, filterFields,
+                    MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories()));
+        } else {
+            return new LSMInvertedIndexDiskComponentBulkLoader(component, bloomFilterSpec, fillFactor, verifyInput,
+                    numElementsHint, checkIfEmptyIndex);
+        }
     }
 
     @Override
@@ -668,13 +666,7 @@
 
     public class LSMInvertedIndexBulkLoader implements IIndexBulkLoader {
         private final ILSMDiskComponent component;
-        private final IIndexBulkLoader invIndexBulkLoader;
-        private final IIndexBulkLoader deletedKeysBTreeBulkLoader;
-        private boolean cleanedUpArtifacts = false;
-        private boolean isEmptyComponent = true;
-        public final PermutingTupleReference indexTuple;
-        public final PermutingTupleReference filterTuple;
-        public final MultiComparator filterCmp;
+        private final ILSMDiskComponentWithBuddyBTreeBulkLoader componentBulkLoader;
 
         public LSMInvertedIndexBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint,
                 boolean checkIfEmptyIndex) throws HyracksDataException {
@@ -684,92 +676,27 @@
             // Note that by using a flush target file name, we state that the
             // new bulk loaded tree is "newer" than any other merged tree.
             component = createBulkLoadTarget();
-            invIndexBulkLoader = ((LSMInvertedIndexDiskComponent) component).getInvIndex().createBulkLoader(fillFactor,
-                    verifyInput, numElementsHint, false);
-
-            //validity of the component depends on the deleted keys file being there even if it's empty.
-            deletedKeysBTreeBulkLoader = ((LSMInvertedIndexDiskComponent) component).getDeletedKeysBTree()
-                    .createBulkLoader(fillFactor, verifyInput, numElementsHint, false);
-
-            if (filterFields != null) {
-                indexTuple = new PermutingTupleReference(invertedIndexFields);
-                filterCmp = MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories());
-                filterTuple = new PermutingTupleReference(filterFields);
-            } else {
-                indexTuple = null;
-                filterCmp = null;
-                filterTuple = null;
-            }
+            componentBulkLoader = createComponentBulkLoader((LSMInvertedIndexDiskComponent) component, false,
+                    fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, true);
         }
 
         @Override
         public void add(ITupleReference tuple) throws HyracksDataException {
-            try {
-                ITupleReference t;
-                if (indexTuple != null) {
-                    indexTuple.reset(tuple);
-                    t = indexTuple;
-                } else {
-                    t = tuple;
-                }
-
-                invIndexBulkLoader.add(t);
-
-                if (filterTuple != null) {
-                    filterTuple.reset(tuple);
-                    component.getLSMComponentFilter().update(filterTuple, filterCmp);
-                }
-
-            } catch (Exception e) {
-                cleanupArtifacts();
-                throw e;
-            }
-            if (isEmptyComponent) {
-                isEmptyComponent = false;
-            }
-        }
-
-        protected void cleanupArtifacts() throws HyracksDataException {
-            if (!cleanedUpArtifacts) {
-                cleanedUpArtifacts = true;
-                ((LSMInvertedIndexDiskComponent) component).getInvIndex().deactivate();
-                ((LSMInvertedIndexDiskComponent) component).getInvIndex().destroy();
-                ((LSMInvertedIndexDiskComponent) component).getDeletedKeysBTree().deactivate();
-                ((LSMInvertedIndexDiskComponent) component).getDeletedKeysBTree().destroy();
-                ((LSMInvertedIndexDiskComponent) component).getBloomFilter().deactivate();
-                ((LSMInvertedIndexDiskComponent) component).getBloomFilter().destroy();
-            }
+            componentBulkLoader.add(tuple);
         }
 
         @Override
         public void end() throws HyracksDataException {
-            if (!cleanedUpArtifacts) {
-                if (component.getLSMComponentFilter() != null) {
-                    filterManager.writeFilter(component.getLSMComponentFilter(),
-                            (((OnDiskInvertedIndex) ((LSMInvertedIndexDiskComponent) component).getInvIndex())
-                                    .getBTree()));
-                }
-                invIndexBulkLoader.end();
-                deletedKeysBTreeBulkLoader.end();
-
-                if (isEmptyComponent) {
-                    cleanupArtifacts();
-                } else {
-                    ioOpCallback.afterOperation(LSMOperationType.FLUSH, null, component);
-                    lsmHarness.addBulkLoadedComponent(component);
-                }
+            componentBulkLoader.end();
+            if (component.getComponentSize() > 0) {
+                ioOpCallback.afterOperation(LSMOperationType.FLUSH, null, component);
+                lsmHarness.addBulkLoadedComponent(component);
             }
         }
 
         @Override
         public void abort() throws HyracksDataException {
-            if (invIndexBulkLoader != null) {
-                invIndexBulkLoader.abort();
-            }
-
-            if (deletedKeysBTreeBulkLoader != null) {
-                deletedKeysBTreeBulkLoader.abort();
-            }
+            componentBulkLoader.abort();
         }
 
         private ILSMDiskComponent createBulkLoadTarget() throws HyracksDataException {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponentBulkLoader.java
new file mode 100644
index 0000000..4322852
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponentBulkLoader.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.invertedindex.impls;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
+import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponentWithBuddyBTreeBulkLoader;
+import org.apache.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndex;
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.MultiComparator;
+
+public class LSMInvertedIndexDiskComponentBulkLoader extends AbstractLSMDiskComponentWithBuddyBTreeBulkLoader {
+
+    //with filter
+    public LSMInvertedIndexDiskComponentBulkLoader(LSMInvertedIndexDiskComponent component,
+            BloomFilterSpecification bloomFilterSpec, float fillFactor, boolean verifyInput, long numElementsHint,
+            boolean checkIfEmptyIndex, ILSMComponentFilterManager filterManager, int[] indexFields, int[] filterFields,
+            MultiComparator filterCmp) throws HyracksDataException {
+        super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, filterManager,
+                indexFields, filterFields, filterCmp);
+    }
+
+    //without filter
+    public LSMInvertedIndexDiskComponentBulkLoader(LSMInvertedIndexDiskComponent component,
+            BloomFilterSpecification bloomFilterSpec, float fillFactor, boolean verifyInput, long numElementsHint,
+            boolean checkIfEmptyIndex) throws HyracksDataException {
+        super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, null, null, null,
+                null);
+    }
+
+    @Override
+    protected BloomFilter getBloomFilter(ILSMDiskComponent component) {
+        return ((LSMInvertedIndexDiskComponent) component).getBloomFilter();
+    }
+
+    @Override
+    protected IIndex getIndex(ILSMDiskComponent component) {
+        return ((LSMInvertedIndexDiskComponent) component).getInvIndex();
+    }
+
+    @Override
+    protected ITreeIndex getTreeIndex(ILSMDiskComponent component) {
+        return ((OnDiskInvertedIndex) ((LSMInvertedIndexDiskComponent) component).getInvIndex()).getBTree();
+    }
+
+    @Override
+    protected ITreeIndex getBuddyBTree(ILSMDiskComponent component) {
+        return ((LSMInvertedIndexDiskComponent) component).getDeletedKeysBTree();
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index 8882639..95db8a3 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -45,11 +45,11 @@
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.common.tuples.DualTupleReference;
-import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFrameFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentWithBuddyBTreeBulkLoader;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
@@ -60,7 +60,6 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
-import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFilterManager;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
@@ -228,47 +227,8 @@
         memRTreeAccessor.search(rtreeScanCursor, rtreeNullPredicate);
         LSMRTreeDiskComponent component = createDiskComponent(componentFactory, flushOp.getRTreeFlushTarget(),
                 flushOp.getBTreeFlushTarget(), flushOp.getBloomFilterFlushTarget(), true);
-        RTree diskRTree = component.getRTree();
-        IIndexBulkLoader rTreeBulkloader;
-        ITreeIndexCursor cursor;
 
-        IBinaryComparatorFactory[] linearizerArray = { linearizer };
-
-        TreeTupleSorter rTreeTupleSorter = new TreeTupleSorter(flushingComponent.getRTree().getFileId(),
-                linearizerArray, rtreeLeafFrameFactory.createFrame(), rtreeLeafFrameFactory.createFrame(),
-                flushingComponent.getRTree().getBufferCache(), comparatorFields);
-        // BulkLoad the tuples from the in-memory tree into the new disk
-        // RTree.
-
-        boolean isEmpty = true;
-        try {
-            while (rtreeScanCursor.hasNext()) {
-                isEmpty = false;
-                rtreeScanCursor.next();
-                rTreeTupleSorter.insertTupleEntry(rtreeScanCursor.getPageId(), rtreeScanCursor.getTupleOffset());
-            }
-        } finally {
-            rtreeScanCursor.close();
-        }
-        rTreeTupleSorter.sort();
-
-        rTreeBulkloader = diskRTree.createBulkLoader(1.0f, false, 0L, false);
-        cursor = rTreeTupleSorter;
-
-        if (!isEmpty) {
-            try {
-                while (cursor.hasNext()) {
-                    cursor.next();
-                    ITupleReference frameTuple = cursor.getTuple();
-                    rTreeBulkloader.add(frameTuple);
-                }
-            } finally {
-                cursor.close();
-            }
-        }
-
-        rTreeBulkloader.end();
-
+        //count the number of tuples in the buddy btree
         ITreeIndexAccessor memBTreeAccessor = flushingComponent.getBTree()
                 .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
         RangePredicate btreeNullPredicate = new RangePredicate(null, null, true, true, null, null);
@@ -285,29 +245,56 @@
             btreeCountingCursor.close();
         }
 
-        int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numBTreeTuples);
-        BloomFilterSpecification bloomFilterSpec =
-                BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
+        ILSMDiskComponentWithBuddyBTreeBulkLoader componentBulkLoader =
+                createComponentBulkLoader(component, true, 1.0f, false, numBTreeTuples, false, false);
+
+        ITreeIndexCursor cursor;
+        IBinaryComparatorFactory[] linearizerArray = { linearizer };
+
+        TreeTupleSorter rTreeTupleSorter = new TreeTupleSorter(flushingComponent.getRTree().getFileId(),
+                linearizerArray, rtreeLeafFrameFactory.createFrame(), rtreeLeafFrameFactory.createFrame(),
+                flushingComponent.getRTree().getBufferCache(), comparatorFields);
+
+        // BulkLoad the tuples from the in-memory tree into the new disk
+        // RTree.
+        boolean isEmpty = true;
+        try {
+            while (rtreeScanCursor.hasNext()) {
+                isEmpty = false;
+                rtreeScanCursor.next();
+                rTreeTupleSorter.insertTupleEntry(rtreeScanCursor.getPageId(), rtreeScanCursor.getTupleOffset());
+            }
+        } finally {
+            rtreeScanCursor.close();
+        }
+        rTreeTupleSorter.sort();
+
+        cursor = rTreeTupleSorter;
+
+        if (!isEmpty) {
+            try {
+                while (cursor.hasNext()) {
+                    cursor.next();
+                    ITupleReference frameTuple = cursor.getTuple();
+                    componentBulkLoader.add(frameTuple);
+                }
+            } finally {
+                cursor.close();
+            }
+        }
 
         IIndexCursor btreeScanCursor = memBTreeAccessor.createSearchCursor(false);
         memBTreeAccessor.search(btreeScanCursor, btreeNullPredicate);
-        BTree diskBTree = component.getBTree();
 
-        // BulkLoad the tuples from the in-memory tree into the new disk BTree.
-        IIndexBulkLoader bTreeBulkloader = diskBTree.createBulkLoader(1.0f, false, numBTreeTuples, false);
-        IIndexBulkLoader builder = component.getBloomFilter().createBuilder(numBTreeTuples,
-                bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements());
         // scan the memory BTree
         try {
             while (btreeScanCursor.hasNext()) {
                 btreeScanCursor.next();
                 ITupleReference frameTuple = btreeScanCursor.getTuple();
-                bTreeBulkloader.add(frameTuple);
-                builder.add(frameTuple);
+                componentBulkLoader.addDeletedTuple(frameTuple);
             }
         } finally {
             btreeScanCursor.close();
-            builder.end();
         }
 
         if (component.getLSMComponentFilter() != null) {
@@ -319,7 +306,8 @@
         }
         // Note. If we change the filter to write to metadata object, we don't need the if block above
         flushingComponent.getMetadata().copy(component.getMetadata());
-        bTreeBulkloader.end();
+
+        componentBulkLoader.end();
         return component;
     }
 
@@ -350,10 +338,10 @@
         LSMRTreeDiskComponent mergedComponent = createDiskComponent(componentFactory, mergeOp.getRTreeMergeTarget(),
                 mergeOp.getBTreeMergeTarget(), mergeOp.getBloomFilterMergeTarget(), true);
 
+        ILSMDiskComponentWithBuddyBTreeBulkLoader componentBulkLoader;
+
         // In case we must keep the deleted-keys BTrees, then they must be merged *before* merging the r-trees so that
         // lsmHarness.endSearch() is called once when the r-trees have been merged.
-        BTree btree = mergedComponent.getBTree();
-        IIndexBulkLoader btreeBulkLoader = btree.createBulkLoader(1.0f, true, 0L, false);
         if (mergeOp.getMergingComponents().get(mergeOp.getMergingComponents().size() - 1) != diskComponents
                 .get(diskComponents.size() - 1)) {
             // Keep the deleted tuples since the oldest disk component is not included in the merge operation
@@ -366,24 +354,31 @@
                 numElements += ((LSMRTreeDiskComponent) mergeOp.getMergingComponents().get(i)).getBloomFilter()
                         .getNumElements();
             }
-
-            int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElements);
-            BloomFilterSpecification bloomFilterSpec =
-                    BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
-            IIndexBulkLoader builder = mergedComponent.getBloomFilter().createBuilder(numElements,
-                    bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements());
+            componentBulkLoader =
+                    createComponentBulkLoader(mergedComponent, true, 1.0f, false, numElements, false, true);
 
             try {
                 while (btreeCursor.hasNext()) {
                     btreeCursor.next();
                     ITupleReference tuple = btreeCursor.getTuple();
-                    btreeBulkLoader.add(tuple);
-                    builder.add(tuple);
+                    componentBulkLoader.addDeletedTuple(tuple);
                 }
             } finally {
                 btreeCursor.close();
-                builder.end();
             }
+        } else {
+            //no buddy-btree needed
+            componentBulkLoader = createComponentBulkLoader(mergedComponent, true, 1.0f, false, 0L, false, true);
+        }
+
+        try {
+            while (cursor.hasNext()) {
+                cursor.next();
+                ITupleReference frameTuple = cursor.getTuple();
+                componentBulkLoader.add(frameTuple);
+            }
+        } finally {
+            cursor.close();
         }
 
         if (mergedComponent.getLSMComponentFilter() != null) {
@@ -395,19 +390,8 @@
             filterManager.updateFilter(mergedComponent.getLSMComponentFilter(), filterTuples);
             filterManager.writeFilter(mergedComponent.getLSMComponentFilter(), mergedComponent.getRTree());
         }
-        btreeBulkLoader.end();
 
-        IIndexBulkLoader bulkLoader = mergedComponent.getRTree().createBulkLoader(1.0f, false, 0L, false);
-        try {
-            while (cursor.hasNext()) {
-                cursor.next();
-                ITupleReference frameTuple = cursor.getTuple();
-                bulkLoader.add(frameTuple);
-            }
-        } finally {
-            cursor.close();
-        }
-        bulkLoader.end();
+        componentBulkLoader.end();
 
         return mergedComponent;
     }
@@ -464,6 +448,24 @@
                 componentFileRefs.getDeleteIndexFileReference(), componentFileRefs.getBloomFilterFileReference(), true);
     }
 
+    public ILSMDiskComponentWithBuddyBTreeBulkLoader createComponentBulkLoader(LSMRTreeDiskComponent component,
+            boolean hasBloomFilter, float fillFactor, boolean verifyInput, long numElementsHint,
+            boolean checkIfEmptyIndex, boolean withFilter) throws HyracksDataException {
+        BloomFilterSpecification bloomFilterSpec = null;
+        if (hasBloomFilter) {
+            int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElementsHint);
+            bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
+        }
+        if (withFilter && filterFields != null) {
+            return new LSMRTreeDiskComponentBulkLoader(component, bloomFilterSpec, fillFactor, verifyInput,
+                    numElementsHint, checkIfEmptyIndex, filterManager, rtreeFields, filterFields,
+                    MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories()));
+        } else {
+            return new LSMRTreeDiskComponentBulkLoader(component, bloomFilterSpec, fillFactor, verifyInput,
+                    numElementsHint, checkIfEmptyIndex);
+        }
+    }
+
     @Override
     public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint,
             boolean checkIfEmptyIndex) throws HyracksDataException {
@@ -512,13 +514,7 @@
 
     public class LSMRTreeBulkLoader implements IIndexBulkLoader {
         private final ILSMDiskComponent component;
-        private final IIndexBulkLoader bulkLoader;
-        private final IIndexBulkLoader buddyBTreeBulkloader;
-        private boolean cleanedUpArtifacts = false;
-        private boolean isEmptyComponent = true;
-        public final PermutingTupleReference indexTuple;
-        public final PermutingTupleReference filterTuple;
-        public final MultiComparator filterCmp;
+        private final ILSMDiskComponentWithBuddyBTreeBulkLoader componentBulkLoader;
 
         public LSMRTreeBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint,
                 boolean checkIfEmptyIndex) throws HyracksDataException {
@@ -528,88 +524,23 @@
             // Note that by using a flush target file name, we state that the
             // new bulk loaded tree is "newer" than any other merged tree.
             component = createBulkLoadTarget();
-            bulkLoader = ((LSMRTreeDiskComponent) component).getRTree().createBulkLoader(fillFactor, verifyInput,
-                    numElementsHint, false);
-            buddyBTreeBulkloader = ((LSMRTreeDiskComponent) component).getBTree().createBulkLoader(fillFactor,
-                    verifyInput, numElementsHint, false);
-            if (filterFields != null) {
-                indexTuple = new PermutingTupleReference(rtreeFields);
-                filterCmp = MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories());
-                filterTuple = new PermutingTupleReference(filterFields);
-            } else {
-                indexTuple = null;
-                filterCmp = null;
-                filterTuple = null;
-            }
+            componentBulkLoader = createComponentBulkLoader((LSMRTreeDiskComponent) component, false, fillFactor,
+                    verifyInput, numElementsHint, checkIfEmptyIndex, true);
         }
 
         @Override
         public void add(ITupleReference tuple) throws HyracksDataException {
-            try {
-                ITupleReference t;
-                if (indexTuple != null) {
-                    indexTuple.reset(tuple);
-                    t = indexTuple;
-                } else {
-                    t = tuple;
-                }
-
-                bulkLoader.add(t);
-
-                if (filterTuple != null) {
-                    filterTuple.reset(tuple);
-                    component.getLSMComponentFilter().update(filterTuple, filterCmp);
-                }
-            } catch (Exception e) {
-                cleanupArtifacts();
-                throw e;
-            }
-            if (isEmptyComponent) {
-                isEmptyComponent = false;
-            }
+            componentBulkLoader.add(tuple);
         }
 
         @Override
         public void end() throws HyracksDataException {
-            if (!cleanedUpArtifacts) {
-
-                if (component.getLSMComponentFilter() != null) {
-                    filterManager.writeFilter(component.getLSMComponentFilter(),
-                            ((LSMRTreeDiskComponent) component).getRTree());
-                }
-
-                bulkLoader.end();
-                buddyBTreeBulkloader.end();
-
-                if (isEmptyComponent) {
-                    cleanupArtifacts();
-                } else {
-                    ioOpCallback.afterOperation(LSMOperationType.FLUSH, null, component);
-                    lsmHarness.addBulkLoadedComponent(component);
-                }
-            }
+            componentBulkLoader.end();
         }
 
         @Override
         public void abort() throws HyracksDataException {
-            if (bulkLoader != null) {
-                bulkLoader.abort();
-            }
-            if (buddyBTreeBulkloader != null) {
-                buddyBTreeBulkloader.abort();
-            }
-        }
-
-        protected void cleanupArtifacts() throws HyracksDataException {
-            if (!cleanedUpArtifacts) {
-                cleanedUpArtifacts = true;
-                ((LSMRTreeDiskComponent) component).getRTree().deactivate();
-                ((LSMRTreeDiskComponent) component).getRTree().destroy();
-                ((LSMRTreeDiskComponent) component).getBTree().deactivate();
-                ((LSMRTreeDiskComponent) component).getBTree().destroy();
-                ((LSMRTreeDiskComponent) component).getBloomFilter().deactivate();
-                ((LSMRTreeDiskComponent) component).getBloomFilter().destroy();
-            }
+            componentBulkLoader.abort();
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponentBulkLoader.java
new file mode 100644
index 0000000..34a9be0
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponentBulkLoader.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.rtree.impls;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
+import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponentWithBuddyBTreeBulkLoader;
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.MultiComparator;
+
+public class LSMRTreeDiskComponentBulkLoader extends AbstractLSMDiskComponentWithBuddyBTreeBulkLoader {
+
+    //with filter
+    public LSMRTreeDiskComponentBulkLoader(LSMRTreeDiskComponent component, BloomFilterSpecification bloomFilterSpec,
+            float fillFactor, boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex,
+            ILSMComponentFilterManager filterManager, int[] indexFields, int[] filterFields, MultiComparator filterCmp)
+            throws HyracksDataException {
+        super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, filterManager,
+                indexFields, filterFields, filterCmp);
+    }
+
+    //without filter
+    public LSMRTreeDiskComponentBulkLoader(LSMRTreeDiskComponent component, BloomFilterSpecification bloomFilterSpec,
+            float fillFactor, boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex)
+            throws HyracksDataException {
+        super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, null, null, null,
+                null);
+    }
+
+    @Override
+    protected BloomFilter getBloomFilter(ILSMDiskComponent component) {
+        return ((LSMRTreeDiskComponent) component).getBloomFilter();
+    }
+
+    @Override
+    protected IIndex getIndex(ILSMDiskComponent component) {
+        return ((LSMRTreeDiskComponent) component).getRTree();
+    }
+
+    @Override
+    protected ITreeIndex getBuddyBTree(ILSMDiskComponent component) {
+        return ((LSMRTreeDiskComponent) component).getBTree();
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
index 018e6d3..ca5b45c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
@@ -37,11 +37,11 @@
 import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
-import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFrameFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
@@ -179,7 +179,6 @@
         memRTreeAccessor.search(rtreeScanCursor, rtreeNullPredicate);
         LSMRTreeDiskComponent component =
                 createDiskComponent(componentFactory, flushOp.getRTreeFlushTarget(), null, null, true);
-        RTree diskRTree = component.getRTree();
 
         // scan the memory BTree
         ITreeIndexAccessor memBTreeAccessor = flushingComponent.getBTree()
@@ -228,7 +227,9 @@
             bTreeTupleSorter.sort();
         }
 
-        IIndexBulkLoader rTreeBulkloader = diskRTree.createBulkLoader(1.0f, false, 0L, false);
+        ILSMDiskComponentBulkLoader componentBulkLoader =
+                createComponentBulkLoader(component, 1.0f, false, 0L, false, false);
+
         LSMRTreeWithAntiMatterTuplesFlushCursor cursor = new LSMRTreeWithAntiMatterTuplesFlushCursor(rTreeTupleSorter,
                 bTreeTupleSorter, comparatorFields, linearizerArray);
         cursor.open(null, null);
@@ -238,7 +239,7 @@
                 cursor.next();
                 ITupleReference frameTuple = cursor.getTuple();
 
-                rTreeBulkloader.add(frameTuple);
+                componentBulkLoader.add(frameTuple);
             }
         } finally {
             cursor.close();
@@ -252,8 +253,8 @@
             filterManager.writeFilter(component.getLSMComponentFilter(), component.getRTree());
         }
         flushingComponent.getMetadata().copy(component.getMetadata());
-        rTreeBulkloader.end();
 
+        componentBulkLoader.end();
         return component;
     }
 
@@ -287,13 +288,13 @@
         // Bulk load the tuples from all on-disk RTrees into the new RTree.
         LSMRTreeDiskComponent component =
                 createDiskComponent(componentFactory, mergeOp.getRTreeMergeTarget(), null, null, true);
-        RTree mergedRTree = component.getRTree();
-        IIndexBulkLoader bulkloader = mergedRTree.createBulkLoader(1.0f, false, 0L, false);
+        ILSMDiskComponentBulkLoader componentBulkLoader =
+                createComponentBulkLoader(component, 1.0f, false, 0L, false, false);
         try {
             while (cursor.hasNext()) {
                 cursor.next();
                 ITupleReference frameTuple = cursor.getTuple();
-                bulkloader.add(frameTuple);
+                componentBulkLoader.add(frameTuple);
             }
         } finally {
             cursor.close();
@@ -307,7 +308,8 @@
             filterManager.updateFilter(component.getLSMComponentFilter(), filterTuples);
             filterManager.writeFilter(component.getLSMComponentFilter(), component.getRTree());
         }
-        bulkloader.end();
+
+        componentBulkLoader.end();
 
         return component;
     }
@@ -335,6 +337,19 @@
         }
     }
 
+    public ILSMDiskComponentBulkLoader createComponentBulkLoader(LSMRTreeDiskComponent component, float fillFactor,
+            boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex, boolean withFilter)
+            throws HyracksDataException {
+        if (withFilter && filterFields != null) {
+            return new LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader(component, null, fillFactor, verifyInput,
+                    numElementsHint, checkIfEmptyIndex, filterManager, rtreeFields, filterFields,
+                    MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories()));
+        } else {
+            return new LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader(component, null, fillFactor, verifyInput,
+                    numElementsHint, checkIfEmptyIndex);
+        }
+    }
+
     @Override
     public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint,
             boolean checkIfEmptyIndex) throws HyracksDataException {
@@ -343,12 +358,7 @@
 
     public class LSMRTreeWithAntiMatterTuplesBulkLoader implements IIndexBulkLoader {
         private final ILSMDiskComponent component;
-        private final IIndexBulkLoader bulkLoader;
-        private boolean cleanedUpArtifacts = false;
-        private boolean isEmptyComponent = true;
-        public final PermutingTupleReference indexTuple;
-        public final PermutingTupleReference filterTuple;
-        public final MultiComparator filterCmp;
+        private final ILSMDiskComponentBulkLoader bulkLoader;
 
         public LSMRTreeWithAntiMatterTuplesBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint,
                 boolean checkIfEmptyIndex) throws HyracksDataException {
@@ -359,63 +369,21 @@
             // new bulk loaded tree is "newer" than any other merged tree.
 
             component = createBulkLoadTarget();
-            bulkLoader = ((LSMRTreeDiskComponent) component).getRTree().createBulkLoader(fillFactor, verifyInput,
-                    numElementsHint, false);
-
-            if (filterFields != null) {
-                indexTuple = new PermutingTupleReference(rtreeFields);
-                filterCmp = MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories());
-                filterTuple = new PermutingTupleReference(filterFields);
-            } else {
-                indexTuple = null;
-                filterCmp = null;
-                filterTuple = null;
-            }
+            bulkLoader = createComponentBulkLoader((LSMRTreeDiskComponent) component, fillFactor, verifyInput,
+                    numElementsHint, checkIfEmptyIndex, true);
         }
 
         @Override
         public void add(ITupleReference tuple) throws HyracksDataException {
-            try {
-                ITupleReference t;
-                if (indexTuple != null) {
-                    indexTuple.reset(tuple);
-                    t = indexTuple;
-                } else {
-                    t = tuple;
-                }
-
-                bulkLoader.add(t);
-
-                if (filterTuple != null) {
-                    filterTuple.reset(tuple);
-                    component.getLSMComponentFilter().update(filterTuple, filterCmp);
-                }
-
-            } catch (Exception e) {
-                cleanupArtifacts();
-                throw e;
-            }
-            if (isEmptyComponent) {
-                isEmptyComponent = false;
-            }
+            bulkLoader.add(tuple);
         }
 
         @Override
         public void end() throws HyracksDataException {
-            if (!cleanedUpArtifacts) {
-
-                if (component.getLSMComponentFilter() != null) {
-                    filterManager.writeFilter(component.getLSMComponentFilter(),
-                            ((LSMRTreeDiskComponent) component).getRTree());
-                }
-                bulkLoader.end();
-
-                if (isEmptyComponent) {
-                    cleanupArtifacts();
-                } else {
-                    ioOpCallback.afterOperation(LSMOperationType.FLUSH, null, component);
-                    lsmHarness.addBulkLoadedComponent(component);
-                }
+            bulkLoader.end();
+            if (component.getComponentSize() > 0) {
+                ioOpCallback.afterOperation(LSMOperationType.FLUSH, null, component);
+                lsmHarness.addBulkLoadedComponent(component);
             }
         }
 
@@ -423,14 +391,6 @@
         public void abort() throws HyracksDataException {
             if (bulkLoader != null) {
                 bulkLoader.abort();
-            }
-        }
-
-        protected void cleanupArtifacts() throws HyracksDataException {
-            if (!cleanedUpArtifacts) {
-                cleanedUpArtifacts = true;
-                ((LSMRTreeDiskComponent) component).getRTree().deactivate();
-                ((LSMRTreeDiskComponent) component).getRTree().destroy();
             }
         }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader.java
new file mode 100644
index 0000000..88a2e1e
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.rtree.impls;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
+import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponentBulkLoader;
+import org.apache.hyracks.storage.common.MultiComparator;
+
+public class LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader extends AbstractLSMDiskComponentBulkLoader {
+
+    //with filter
+    public LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader(LSMRTreeDiskComponent component,
+            BloomFilterSpecification bloomFilterSpec, float fillFactor, boolean verifyInput, long numElementsHint,
+            boolean checkIfEmptyIndex, ILSMComponentFilterManager filterManager, int[] indexFields, int[] filterFields,
+            MultiComparator filterCmp) throws HyracksDataException {
+        super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, filterManager,
+                indexFields, filterFields, filterCmp);
+    }
+
+    //without filter
+    public LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader(LSMRTreeDiskComponent component,
+            BloomFilterSpecification bloomFilterSpec, float fillFactor, boolean verifyInput, long numElementsHint,
+            boolean checkIfEmptyIndex) throws HyracksDataException {
+        super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, null, null, null,
+                null);
+    }
+
+    @Override
+    protected BloomFilter getBloomFilter(ILSMDiskComponent component) {
+        return ((LSMRTreeDiskComponent) component).getBloomFilter();
+    }
+
+    @Override
+    protected ITreeIndex getIndex(ILSMDiskComponent component) {
+        return ((LSMRTreeDiskComponent) component).getRTree();
+    }
+
+}

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1773
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I772a6d68761fcbb85982a1c9f72f2d186e1d1ffb
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Luo Chen <cluo8@uci.edu>


Mime
View raw message