asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Wail Alkowaileet (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: [WIP] Extract bulk loader out
Date Wed, 07 Feb 2018 00:37:07 GMT
Wail Alkowaileet has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2367

Change subject: [WIP] Extract bulk loader out
......................................................................

[WIP] Extract bulk loader out

TODO:
- Clean up InvertedIndex bulkloader
- Extract external indexes bulkloader

Change-Id: I7f42a391a4de4b02acf6a8fdaf2b60818c1da806
---
M hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
A hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTreeBulkLoader.java
A hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IBulkLoadFinalizer.java
M hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
A hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndexBulkLoader.java
M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexBulkLoader.java
M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
A hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexBulkLoader.java
M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java
M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java
M hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
A hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTreeBulkLoader.java
12 files changed, 915 insertions(+), 690 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/67/2367/1

diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
index 6e2d694..98cb391 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
@@ -41,7 +41,6 @@
 import org.apache.hyracks.storage.am.btree.impls.BTreeOpContext.PageValidationInfo;
 import org.apache.hyracks.storage.am.common.api.IBTreeIndexTupleReference;
 import org.apache.hyracks.storage.am.common.api.IPageManager;
-import org.apache.hyracks.storage.am.common.api.ISplitKey;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexAccessor;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
@@ -50,7 +49,6 @@
 import org.apache.hyracks.storage.am.common.frames.FrameOpSpaceStatus;
 import org.apache.hyracks.storage.am.common.impls.AbstractTreeIndex;
 import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
-import org.apache.hyracks.storage.am.common.impls.NodeFrontier;
 import org.apache.hyracks.storage.am.common.impls.TreeIndexDiskOrderScanCursor;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.common.IIndexAccessParameters;
@@ -61,7 +59,6 @@
 import org.apache.hyracks.storage.common.ISearchOperationCallback;
 import org.apache.hyracks.storage.common.ISearchPredicate;
 import org.apache.hyracks.storage.common.MultiComparator;
-import org.apache.hyracks.storage.common.buffercache.BufferCache;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
@@ -1000,204 +997,7 @@
     @Override
     public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint,
             boolean checkIfEmptyIndex) throws HyracksDataException {
-        return new BTreeBulkLoader(fillFactor, verifyInput);
-    }
-
-    public class BTreeBulkLoader extends AbstractTreeIndex.AbstractTreeIndexBulkLoader {
-        protected final ISplitKey splitKey;
-        protected final boolean verifyInput;
-
-        public BTreeBulkLoader(float fillFactor, boolean verifyInput) throws HyracksDataException {
-            super(fillFactor);
-            this.verifyInput = verifyInput;
-            splitKey = new BTreeSplitKey(leafFrame.getTupleWriter().createTupleReference());
-            splitKey.getTuple().setFieldCount(cmp.getKeyFieldCount());
-        }
-
-        @Override
-        public void add(ITupleReference tuple) throws HyracksDataException {
-            try {
-                int tupleSize = Math.max(leafFrame.getBytesRequiredToWriteTuple(tuple),
-                        interiorFrame.getBytesRequiredToWriteTuple(tuple));
-
-                NodeFrontier leafFrontier = nodeFrontiers.get(0);
-
-                int spaceNeeded = tupleWriter.bytesRequired(tuple) + slotSize;
-                int spaceUsed = leafFrame.getBuffer().capacity() - leafFrame.getTotalFreeSpace();
-
-                // try to free space by compression
-                if (spaceUsed + spaceNeeded > leafMaxBytes) {
-                    leafFrame.compress();
-                    spaceUsed = leafFrame.getBuffer().capacity() - leafFrame.getTotalFreeSpace();
-                }
-                //full, allocate new page
-                if (spaceUsed + spaceNeeded > leafMaxBytes) {
-                    if (leafFrame.getTupleCount() == 0) {
-                        bufferCache.returnPage(leafFrontier.page, false);
-                    } else {
-                        leafFrontier.lastTuple.resetByTupleIndex(leafFrame, leafFrame.getTupleCount() - 1);
-                        if (verifyInput) {
-                            verifyInputTuple(tuple, leafFrontier.lastTuple);
-                        }
-                        int splitKeySize = tupleWriter.bytesRequired(leafFrontier.lastTuple, 0, cmp.getKeyFieldCount());
-                        splitKey.initData(splitKeySize);
-                        tupleWriter.writeTupleFields(leafFrontier.lastTuple, 0, cmp.getKeyFieldCount(),
-                                splitKey.getBuffer().array(), 0);
-                        splitKey.getTuple().resetByTupleOffset(splitKey.getBuffer().array(), 0);
-                        splitKey.setLeftPage(leafFrontier.pageId);
-
-                        propagateBulk(1, pagesToWrite);
-
-                        leafFrontier.pageId = freePageManager.takePage(metaFrame);
-
-                        ((IBTreeLeafFrame) leafFrame).setNextLeaf(leafFrontier.pageId);
-
-                        queue.put(leafFrontier.page);
-                        for (ICachedPage c : pagesToWrite) {
-                            queue.put(c);
-                        }
-                        pagesToWrite.clear();
-
-                        splitKey.setRightPage(leafFrontier.pageId);
-                    }
-                    if (tupleSize > maxTupleSize) {
-                        final long dpid = BufferedFileHandle.getDiskPageId(getFileId(), leafFrontier.pageId);
-                        // calculate required number of pages.
-                        int headerSize = Math.max(leafFrame.getPageHeaderSize(), interiorFrame.getPageHeaderSize());
-                        final int multiplier =
-                                (int) Math.ceil((double) tupleSize / (bufferCache.getPageSize() - headerSize));
-                        if (multiplier > 1) {
-                            leafFrontier.page = bufferCache.confiscateLargePage(dpid, multiplier,
-                                    freePageManager.takeBlock(metaFrame, multiplier - 1));
-                        } else {
-                            leafFrontier.page = bufferCache.confiscatePage(dpid);
-                        }
-                        leafFrame.setPage(leafFrontier.page);
-                        leafFrame.initBuffer((byte) 0);
-                        ((IBTreeLeafFrame) leafFrame).setLargeFlag(true);
-                    } else {
-                        final long dpid = BufferedFileHandle.getDiskPageId(getFileId(), leafFrontier.pageId);
-                        leafFrontier.page = bufferCache.confiscatePage(dpid);
-                        leafFrame.setPage(leafFrontier.page);
-                        leafFrame.initBuffer((byte) 0);
-                    }
-                } else {
-                    if (verifyInput && leafFrame.getTupleCount() > 0) {
-                        leafFrontier.lastTuple.resetByTupleIndex(leafFrame, leafFrame.getTupleCount() - 1);
-                        verifyInputTuple(tuple, leafFrontier.lastTuple);
-                    }
-                }
-                ((IBTreeLeafFrame) leafFrame).insertSorted(tuple);
-            } catch (HyracksDataException | RuntimeException e) {
-                handleException();
-                throw e;
-            }
-        }
-
-        protected void verifyInputTuple(ITupleReference tuple, ITupleReference prevTuple) throws HyracksDataException {
-            // New tuple should be strictly greater than last tuple.
-            int cmpResult = cmp.compare(tuple, prevTuple);
-            if (cmpResult < 0) {
-                throw HyracksDataException.create(ErrorCode.UNSORTED_LOAD_INPUT);
-            }
-            if (cmpResult == 0) {
-                throw HyracksDataException.create(ErrorCode.DUPLICATE_LOAD_INPUT);
-            }
-        }
-
-        protected void propagateBulk(int level, List<ICachedPage> pagesToWrite) throws HyracksDataException {
-            if (splitKey.getBuffer() == null) {
-                return;
-            }
-
-            if (level >= nodeFrontiers.size()) {
-                addLevel();
-            }
-
-            NodeFrontier frontier = nodeFrontiers.get(level);
-            interiorFrame.setPage(frontier.page);
-
-            ITupleReference tuple = splitKey.getTuple();
-            int tupleBytes = tupleWriter.bytesRequired(tuple, 0, cmp.getKeyFieldCount());
-            int spaceNeeded = tupleBytes + slotSize + 4;
-            if (tupleBytes > interiorFrame.getMaxTupleSize(BTree.this.bufferCache.getPageSize())) {
-                throw HyracksDataException.create(ErrorCode.RECORD_IS_TOO_LARGE, tupleBytes,
-                        interiorFrame.getMaxTupleSize(BTree.this.bufferCache.getPageSize()));
-            }
-
-            int spaceUsed = interiorFrame.getBuffer().capacity() - interiorFrame.getTotalFreeSpace();
-            if (spaceUsed + spaceNeeded > interiorMaxBytes) {
-
-                ISplitKey copyKey = splitKey.duplicate(leafFrame.getTupleWriter().createTupleReference());
-                tuple = copyKey.getTuple();
-
-                frontier.lastTuple.resetByTupleIndex(interiorFrame, interiorFrame.getTupleCount() - 1);
-                int splitKeySize = tupleWriter.bytesRequired(frontier.lastTuple, 0, cmp.getKeyFieldCount());
-                splitKey.initData(splitKeySize);
-                tupleWriter.writeTupleFields(frontier.lastTuple, 0, cmp.getKeyFieldCount(),
-                        splitKey.getBuffer().array(), 0);
-                splitKey.getTuple().resetByTupleOffset(splitKey.getBuffer().array(), 0);
-
-                ((IBTreeInteriorFrame) interiorFrame).deleteGreatest();
-                int finalPageId = freePageManager.takePage(metaFrame);
-                frontier.page.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(), finalPageId));
-                pagesToWrite.add(frontier.page);
-                splitKey.setLeftPage(finalPageId);
-
-                propagateBulk(level + 1, pagesToWrite);
-                frontier.page = bufferCache.confiscatePage(BufferCache.INVALID_DPID);
-                interiorFrame.setPage(frontier.page);
-                interiorFrame.initBuffer((byte) level);
-            }
-            ((IBTreeInteriorFrame) interiorFrame).insertSorted(tuple);
-        }
-
-        private void persistFrontiers(int level, int rightPage) throws HyracksDataException {
-            if (level >= nodeFrontiers.size()) {
-                rootPage = nodeFrontiers.get(level - 1).pageId;
-                releasedLatches = true;
-                return;
-            }
-            if (level < 1) {
-                ICachedPage lastLeaf = nodeFrontiers.get(level).page;
-                int lastLeafPage = nodeFrontiers.get(level).pageId;
-                lastLeaf.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(), nodeFrontiers.get(level).pageId));
-                queue.put(lastLeaf);
-                nodeFrontiers.get(level).page = null;
-                persistFrontiers(level + 1, lastLeafPage);
-                return;
-            }
-            NodeFrontier frontier = nodeFrontiers.get(level);
-            interiorFrame.setPage(frontier.page);
-            //just finalize = the layer right above the leaves has correct righthand pointers already
-            if (rightPage < 0) {
-                throw new HyracksDataException(
-                        "Error in index creation. Internal node appears to have no rightmost guide");
-            }
-            ((IBTreeInteriorFrame) interiorFrame).setRightmostChildPageId(rightPage);
-            int finalPageId = freePageManager.takePage(metaFrame);
-            frontier.page.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(), finalPageId));
-            queue.put(frontier.page);
-            frontier.pageId = finalPageId;
-
-            persistFrontiers(level + 1, finalPageId);
-        }
-
-        @Override
-        public void end() throws HyracksDataException {
-            try {
-                persistFrontiers(0, -1);
-                super.end();
-            } catch (HyracksDataException | RuntimeException e) {
-                handleException();
-                throw e;
-            }
-        }
-
-        @Override
-        public void abort() throws HyracksDataException {
-            super.handleException();
-        }
+        return new BTreeBulkLoader(this, fillFactor, verifyInput, maxTupleSize, rootPageSetter);
     }
 
     @SuppressWarnings("rawtypes")
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTreeBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTreeBulkLoader.java
new file mode 100644
index 0000000..660e7d8
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTreeBulkLoader.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.btree.impls;
+
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.btree.api.IBTreeInteriorFrame;
+import org.apache.hyracks.storage.am.btree.api.IBTreeLeafFrame;
+import org.apache.hyracks.storage.am.common.api.IBulkLoadFinalizer;
+import org.apache.hyracks.storage.am.common.api.ISplitKey;
+import org.apache.hyracks.storage.am.common.impls.AbstractTreeIndexBulkLoader;
+import org.apache.hyracks.storage.am.common.impls.NodeFrontier;
+import org.apache.hyracks.storage.common.buffercache.BufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+
+public class BTreeBulkLoader extends AbstractTreeIndexBulkLoader {
+    protected final ISplitKey splitKey;
+    protected final boolean verifyInput;
+
+    public BTreeBulkLoader(BTree index, float fillFactor, boolean verifyInput, int maxTupleSize,
+            IBulkLoadFinalizer<Void, Integer> finalizer) throws HyracksDataException {
+        super(index, fillFactor, maxTupleSize, finalizer);
+        this.verifyInput = verifyInput;
+        splitKey = new BTreeSplitKey(leafFrame.getTupleWriter().createTupleReference());
+        splitKey.getTuple().setFieldCount(cmp.getKeyFieldCount());
+    }
+
+    @Override
+    public void add(ITupleReference tuple) throws HyracksDataException {
+        try {
+            int tupleSize = Math.max(leafFrame.getBytesRequiredToWriteTuple(tuple),
+                    interiorFrame.getBytesRequiredToWriteTuple(tuple));
+
+            NodeFrontier leafFrontier = nodeFrontiers.get(0);
+
+            int spaceNeeded = tupleWriter.bytesRequired(tuple) + slotSize;
+            int spaceUsed = getFrameUsedSpace(leafFrame);
+
+            // try to free space by compression
+            if (spaceUsed + spaceNeeded > leafMaxBytes) {
+                leafFrame.compress();
+                spaceUsed = getFrameUsedSpace(leafFrame);
+            }
+            //full, allocate new page
+            if (spaceUsed + spaceNeeded > leafMaxBytes) {
+                if (leafFrame.getTupleCount() == 0) {
+                    bufferCache.returnPage(leafFrontier.page, false);
+                } else {
+                    leafFrontier.lastTuple.resetByTupleIndex(leafFrame, leafFrame.getTupleCount() - 1);
+                    if (verifyInput) {
+                        verifyInputTuple(tuple, leafFrontier.lastTuple);
+                    }
+                    int splitKeySize = tupleWriter.bytesRequired(leafFrontier.lastTuple, 0, cmp.getKeyFieldCount());
+                    splitKey.initData(splitKeySize);
+                    tupleWriter.writeTupleFields(leafFrontier.lastTuple, 0, cmp.getKeyFieldCount(),
+                            splitKey.getBuffer().array(), 0);
+                    splitKey.getTuple().resetByTupleOffset(splitKey.getBuffer().array(), 0);
+                    splitKey.setLeftPage(leafFrontier.pageId);
+
+                    propagateBulk(1, pagesToWrite);
+
+                    leafFrontier.pageId = freePageManager.takePage(metaFrame);
+
+                    ((IBTreeLeafFrame) leafFrame).setNextLeaf(leafFrontier.pageId);
+
+                    queue.put(leafFrontier.page);
+                    for (ICachedPage c : pagesToWrite) {
+                        queue.put(c);
+                    }
+                    pagesToWrite.clear();
+
+                    splitKey.setRightPage(leafFrontier.pageId);
+                }
+                if (tupleSize > maxTupleSize) {
+                    final long dpid = BufferedFileHandle.getDiskPageId(fileId, leafFrontier.pageId);
+                    // calculate required number of pages.
+                    int headerSize = Math.max(leafFrame.getPageHeaderSize(), interiorFrame.getPageHeaderSize());
+                    final int multiplier =
+                            (int) Math.ceil((double) tupleSize / (bufferCache.getPageSize() - headerSize));
+                    if (multiplier > 1) {
+                        leafFrontier.page = bufferCache.confiscateLargePage(dpid, multiplier,
+                                freePageManager.takeBlock(metaFrame, multiplier - 1));
+                    } else {
+                        leafFrontier.page = bufferCache.confiscatePage(dpid);
+                    }
+                    leafFrame.setPage(leafFrontier.page);
+                    leafFrame.initBuffer((byte) 0);
+                    ((IBTreeLeafFrame) leafFrame).setLargeFlag(true);
+                } else {
+                    final long dpid = BufferedFileHandle.getDiskPageId(fileId, leafFrontier.pageId);
+                    leafFrontier.page = bufferCache.confiscatePage(dpid);
+                    leafFrame.setPage(leafFrontier.page);
+                    leafFrame.initBuffer((byte) 0);
+                }
+            } else {
+                if (verifyInput && leafFrame.getTupleCount() > 0) {
+                    leafFrontier.lastTuple.resetByTupleIndex(leafFrame, leafFrame.getTupleCount() - 1);
+                    verifyInputTuple(tuple, leafFrontier.lastTuple);
+                }
+            }
+            ((IBTreeLeafFrame) leafFrame).insertSorted(tuple);
+        } catch (HyracksDataException | RuntimeException e) {
+            cleanUp();
+            throw e;
+        }
+    }
+
+    protected void verifyInputTuple(ITupleReference tuple, ITupleReference prevTuple) throws HyracksDataException {
+        // New tuple should be strictly greater than last tuple.
+        int cmpResult = cmp.compare(tuple, prevTuple);
+        if (cmpResult < 0) {
+            throw HyracksDataException.create(ErrorCode.UNSORTED_LOAD_INPUT);
+        }
+        if (cmpResult == 0) {
+            throw HyracksDataException.create(ErrorCode.DUPLICATE_LOAD_INPUT);
+        }
+    }
+
+    protected void propagateBulk(int level, List<ICachedPage> pagesToWrite) throws HyracksDataException {
+        if (splitKey.getBuffer() == null) {
+            return;
+        }
+
+        if (level >= nodeFrontiers.size()) {
+            addLevel();
+        }
+
+        NodeFrontier frontier = nodeFrontiers.get(level);
+        interiorFrame.setPage(frontier.page);
+
+        ITupleReference tuple = splitKey.getTuple();
+        int tupleBytes = tupleWriter.bytesRequired(tuple, 0, cmp.getKeyFieldCount());
+        int spaceNeeded = tupleBytes + slotSize + 4;
+        if (tupleBytes > interiorFrame.getMaxTupleSize(bufferCache.getPageSize())) {
+            throw HyracksDataException.create(ErrorCode.RECORD_IS_TOO_LARGE, tupleBytes,
+                    interiorFrame.getMaxTupleSize(bufferCache.getPageSize()));
+        }
+
+        int spaceUsed = interiorFrame.getBuffer().capacity() - interiorFrame.getTotalFreeSpace();
+        if (spaceUsed + spaceNeeded > interiorMaxBytes) {
+
+            ISplitKey copyKey = splitKey.duplicate(leafFrame.getTupleWriter().createTupleReference());
+            tuple = copyKey.getTuple();
+
+            frontier.lastTuple.resetByTupleIndex(interiorFrame, interiorFrame.getTupleCount() - 1);
+            int splitKeySize = tupleWriter.bytesRequired(frontier.lastTuple, 0, cmp.getKeyFieldCount());
+            splitKey.initData(splitKeySize);
+            tupleWriter.writeTupleFields(frontier.lastTuple, 0, cmp.getKeyFieldCount(), splitKey.getBuffer().array(),
+                    0);
+            splitKey.getTuple().resetByTupleOffset(splitKey.getBuffer().array(), 0);
+
+            ((IBTreeInteriorFrame) interiorFrame).deleteGreatest();
+            int finalPageId = freePageManager.takePage(metaFrame);
+            frontier.page.setDiskPageId(BufferedFileHandle.getDiskPageId(fileId, finalPageId));
+            pagesToWrite.add(frontier.page);
+            splitKey.setLeftPage(finalPageId);
+
+            propagateBulk(level + 1, pagesToWrite);
+            frontier.page = bufferCache.confiscatePage(BufferCache.INVALID_DPID);
+            interiorFrame.setPage(frontier.page);
+            interiorFrame.initBuffer((byte) level);
+        }
+        ((IBTreeInteriorFrame) interiorFrame).insertSorted(tuple);
+    }
+
+    private void persistFrontiers(int level, int rightPage) throws HyracksDataException {
+        if (level >= nodeFrontiers.size()) {
+            rootPage = nodeFrontiers.get(level - 1).pageId;
+            releasedLatches = true;
+            return;
+        }
+        if (level < 1) {
+            ICachedPage lastLeaf = nodeFrontiers.get(level).page;
+            int lastLeafPage = nodeFrontiers.get(level).pageId;
+            lastLeaf.setDiskPageId(BufferedFileHandle.getDiskPageId(fileId, nodeFrontiers.get(level).pageId));
+            queue.put(lastLeaf);
+            nodeFrontiers.get(level).page = null;
+            persistFrontiers(level + 1, lastLeafPage);
+            return;
+        }
+        NodeFrontier frontier = nodeFrontiers.get(level);
+        interiorFrame.setPage(frontier.page);
+        //just finalize = the layer right above the leaves has correct righthand pointers already
+        if (rightPage < 0) {
+            throw new HyracksDataException("Error in index creation. Internal node appears to have no rightmost guide");
+        }
+        ((IBTreeInteriorFrame) interiorFrame).setRightmostChildPageId(rightPage);
+        int finalPageId = freePageManager.takePage(metaFrame);
+        frontier.page.setDiskPageId(BufferedFileHandle.getDiskPageId(fileId, finalPageId));
+        queue.put(frontier.page);
+        frontier.pageId = finalPageId;
+
+        persistFrontiers(level + 1, finalPageId);
+    }
+
+    @Override
+    public void end() throws HyracksDataException {
+        try {
+            persistFrontiers(0, -1);
+            super.end();
+        } catch (HyracksDataException | RuntimeException e) {
+            cleanUp();
+            throw e;
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IBulkLoadFinalizer.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IBulkLoadFinalizer.java
new file mode 100644
index 0000000..9419706
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IBulkLoadFinalizer.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.common.api;
+
+/**
+ * A generic API for a Bulkloader to callback the index after finishing the bulkload operation.
+ */
+@FunctionalInterface
+public interface IBulkLoadFinalizer<R, T> {
+    public R finalizeBulkLoad(T arg);
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
index 905c99d..82037de 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
@@ -19,26 +19,20 @@
 
 package org.apache.hyracks.storage.am.common.impls;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.IBulkLoadFinalizer;
 import org.apache.hyracks.storage.am.common.api.IPageManager;
 import org.apache.hyracks.storage.am.common.api.ITreeIndex;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexAccessor;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
-import org.apache.hyracks.storage.am.common.api.ITreeIndexMetadataFrame;
-import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
 import org.apache.hyracks.storage.common.IIndexBulkLoader;
-import org.apache.hyracks.storage.common.MultiComparator;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
-import org.apache.hyracks.storage.common.buffercache.IFIFOPageQueue;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
 
 public abstract class AbstractTreeIndex implements ITreeIndex {
@@ -55,6 +49,7 @@
 
     protected final IBinaryComparatorFactory[] cmpFactories;
     protected final int fieldCount;
+    protected final IBulkLoadFinalizer<Void, Integer> rootPageSetter;
 
     protected FileReference file;
     private int fileId = -1;
@@ -73,6 +68,10 @@
         this.cmpFactories = cmpFactories;
         this.fieldCount = fieldCount;
         this.file = file;
+        rootPageSetter = (Integer newRootPage) -> {
+            rootPage = newRootPage;
+            return null;
+        };
     }
 
     @Override
@@ -227,99 +226,10 @@
         return fieldCount;
     }
 
-    public abstract class AbstractTreeIndexBulkLoader implements IIndexBulkLoader {
-        protected final MultiComparator cmp;
-        protected final int slotSize;
-        protected final int leafMaxBytes;
-        protected final int interiorMaxBytes;
-        protected final ArrayList<NodeFrontier> nodeFrontiers = new ArrayList<>();
-        protected final ITreeIndexMetadataFrame metaFrame;
-        protected final ITreeIndexTupleWriter tupleWriter;
-        protected ITreeIndexFrame leafFrame;
-        protected ITreeIndexFrame interiorFrame;
-        // Immutable bulk loaders write their root page at page -2, as needed e.g. by append-only file systems such as
-        // HDFS.  Since loading this tree relies on the root page actually being at that point, no further inserts into
-        // that tree are allowed.  Currently, this is not enforced.
-        protected boolean releasedLatches;
-        protected final IFIFOPageQueue queue;
-        protected List<ICachedPage> pagesToWrite;
-
-        public AbstractTreeIndexBulkLoader(float fillFactor) throws HyracksDataException {
-            leafFrame = leafFrameFactory.createFrame();
-            interiorFrame = interiorFrameFactory.createFrame();
-            metaFrame = freePageManager.createMetadataFrame();
-
-            queue = bufferCache.createFIFOQueue();
-
-            if (!isEmptyTree(leafFrame)) {
-                throw HyracksDataException.create(ErrorCode.CANNOT_BULK_LOAD_NON_EMPTY_TREE);
-            }
-
-            this.cmp = MultiComparator.create(cmpFactories);
-
-            leafFrame.setMultiComparator(cmp);
-            interiorFrame.setMultiComparator(cmp);
-
-            tupleWriter = leafFrame.getTupleWriter();
-
-            NodeFrontier leafFrontier = new NodeFrontier(leafFrame.createTupleReference());
-            leafFrontier.pageId = freePageManager.takePage(metaFrame);
-            leafFrontier.page =
-                    bufferCache.confiscatePage(BufferedFileHandle.getDiskPageId(fileId, leafFrontier.pageId));
-
-            interiorFrame.setPage(leafFrontier.page);
-            interiorFrame.initBuffer((byte) 0);
-            interiorMaxBytes = (int) (interiorFrame.getBuffer().capacity() * fillFactor);
-
-            leafFrame.setPage(leafFrontier.page);
-            leafFrame.initBuffer((byte) 0);
-            leafMaxBytes = (int) (leafFrame.getBuffer().capacity() * fillFactor);
-            slotSize = leafFrame.getSlotSize();
-
-            nodeFrontiers.add(leafFrontier);
-            pagesToWrite = new ArrayList<>();
-        }
-
-        protected void handleException() throws HyracksDataException {
-            // Unlatch and unpin pages that weren't in the queue to avoid leaking memory.
-            for (NodeFrontier nodeFrontier : nodeFrontiers) {
-                ICachedPage frontierPage = nodeFrontier.page;
-                if (frontierPage.confiscated()) {
-                    bufferCache.returnPage(frontierPage, false);
-                }
-            }
-            for (ICachedPage pageToDiscard : pagesToWrite) {
-                bufferCache.returnPage(pageToDiscard, false);
-            }
-            releasedLatches = true;
-        }
-
-        @Override
-        public void end() throws HyracksDataException {
-            bufferCache.finishQueue();
-            freePageManager.setRootPageId(rootPage);
-        }
-
-        protected void addLevel() throws HyracksDataException {
-            NodeFrontier frontier = new NodeFrontier(tupleWriter.createTupleReference());
-            frontier.page = bufferCache.confiscatePage(IBufferCache.INVALID_DPID);
-            frontier.pageId = -1;
-            frontier.lastTuple.setFieldCount(cmp.getKeyFieldCount());
-            interiorFrame.setPage(frontier.page);
-            interiorFrame.initBuffer((byte) nodeFrontiers.size());
-            nodeFrontiers.add(frontier);
-        }
-
-        public ITreeIndexFrame getLeafFrame() {
-            return leafFrame;
-        }
-
-        public void setLeafFrame(ITreeIndexFrame leafFrame) {
-            this.leafFrame = leafFrame;
-        }
-
-    }
-
+    /**
+     * TODO: Should be deleted ? This never been used.
+     *
+     */
     public class TreeIndexInsertBulkLoader implements IIndexBulkLoader {
         ITreeIndexAccessor accessor;
 
@@ -347,10 +257,6 @@
     @Override
     public long getMemoryAllocationSize() {
         return 0;
-    }
-
-    public IBinaryComparatorFactory[] getCmpFactories() {
-        return cmpFactories;
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndexBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndexBulkLoader.java
new file mode 100644
index 0000000..8376150
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndexBulkLoader.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.common.impls;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.IBulkLoadFinalizer;
+import org.apache.hyracks.storage.am.common.api.IPageManager;
+import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
+import org.apache.hyracks.storage.am.common.api.ITreeIndexMetadataFrame;
+import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
+import org.apache.hyracks.storage.common.IIndexBulkLoader;
+import org.apache.hyracks.storage.common.MultiComparator;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.IFIFOPageQueue;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+
+public abstract class AbstractTreeIndexBulkLoader implements IIndexBulkLoader {
+
+    protected final MultiComparator cmp;
+    protected final int slotSize;
+    protected final int leafMaxBytes;
+    protected final int interiorMaxBytes;
+    protected final ArrayList<NodeFrontier> nodeFrontiers;
+    protected final ITreeIndexMetadataFrame metaFrame;
+    protected final ITreeIndexTupleWriter tupleWriter;
+    protected ITreeIndexFrame leafFrame;
+    protected ITreeIndexFrame interiorFrame;
+    // Immutable bulk loaders write their root page at page -2, as needed e.g. by append-only file systems such as
+    // HDFS.  Since loading this tree relies on the root page actually being at that point, no further inserts into
+    // that tree are allowed.  Currently, this is not enforced.
+    protected boolean releasedLatches;
+    protected final IFIFOPageQueue queue;
+    protected final List<ICachedPage> pagesToWrite;
+
+    protected final IBufferCache bufferCache;
+    protected final IPageManager freePageManager;
+    protected final int maxTupleSize;
+    protected final int fileId;
+    protected final IBulkLoadFinalizer<Void, Integer> rootPageSetter;
+    protected int rootPage;
+
+    public AbstractTreeIndexBulkLoader(AbstractTreeIndex index, float fillFactor, int maxTupleSize,
+            IBulkLoadFinalizer<Void, Integer> rootPageSetter) throws HyracksDataException {
+        this.leafFrame = index.getLeafFrameFactory().createFrame();
+
+        if (!index.isEmptyTree(leafFrame)) {
+            throw HyracksDataException.create(ErrorCode.CANNOT_BULK_LOAD_NON_EMPTY_TREE);
+        }
+        this.bufferCache = index.getBufferCache();
+        this.freePageManager = index.getPageManager();
+        this.interiorFrame = index.getInteriorFrameFactory().createFrame();
+        metaFrame = freePageManager.createMetadataFrame();
+
+        queue = bufferCache.createFIFOQueue();
+
+        this.cmp = MultiComparator.create(index.getComparatorFactories());
+
+        leafFrame.setMultiComparator(cmp);
+        interiorFrame.setMultiComparator(cmp);
+
+        tupleWriter = leafFrame.getTupleWriter();
+
+        NodeFrontier leafFrontier = new NodeFrontier(leafFrame.createTupleReference());
+        leafFrontier.pageId = freePageManager.takePage(metaFrame);
+        leafFrontier.page =
+                bufferCache.confiscatePage(BufferedFileHandle.getDiskPageId(index.getFileId(), leafFrontier.pageId));
+
+        interiorFrame.setPage(leafFrontier.page);
+        interiorFrame.initBuffer((byte) 0);
+        interiorMaxBytes = (int) (interiorFrame.getBuffer().capacity() * fillFactor);
+
+        leafFrame.setPage(leafFrontier.page);
+        leafFrame.initBuffer((byte) 0);
+        leafMaxBytes = (int) (leafFrame.getBuffer().capacity() * fillFactor);
+        slotSize = leafFrame.getSlotSize();
+
+        nodeFrontiers = new ArrayList<>();
+        pagesToWrite = new ArrayList<>();
+        nodeFrontiers.add(leafFrontier);
+        this.maxTupleSize = maxTupleSize;
+        fileId = index.getFileId();
+        this.rootPageSetter = rootPageSetter;
+
+    }
+
+    /* ********************************
+     * IIndexBulkLoader methods
+     * ********************************
+     */
+
+    @Override
+    public void end() throws HyracksDataException {
+        rootPageSetter.finalizeBulkLoad(rootPage);
+        bufferCache.finishQueue();
+        freePageManager.setRootPageId(rootPage);
+    }
+
+    /**
+     * Calling abort will release latches and unpin all pages
+     */
+    @Override
+    public void abort() throws HyracksDataException {
+        cleanUp();
+    }
+
+    /* ********************************
+     * TreeIndex additional methods
+     * ********************************
+     */
+    public ITreeIndexFrame getLeafFrame() {
+        return leafFrame;
+    }
+
+    protected int getFrameUsedSpace(ITreeIndexFrame frame) {
+        return frame.getBuffer().capacity() - frame.getTotalFreeSpace();
+    }
+
+    protected void addLevel() throws HyracksDataException {
+        NodeFrontier frontier = new NodeFrontier(tupleWriter.createTupleReference());
+        frontier.page = bufferCache.confiscatePage(IBufferCache.INVALID_DPID);
+        frontier.pageId = -1;
+        frontier.lastTuple.setFieldCount(cmp.getKeyFieldCount());
+        interiorFrame.setPage(frontier.page);
+        interiorFrame.initBuffer((byte) nodeFrontiers.size());
+        nodeFrontiers.add(frontier);
+    }
+
+    protected void cleanUp() throws HyracksDataException {
+        // Unlatch and unpin pages that weren't in the queue to avoid leaking memory.
+        for (NodeFrontier nodeFrontier : nodeFrontiers) {
+            ICachedPage frontierPage = nodeFrontier.page;
+            if (frontierPage.confiscated()) {
+                bufferCache.returnPage(frontierPage, false);
+            }
+        }
+        for (ICachedPage pageToDiscard : pagesToWrite) {
+            bufferCache.returnPage(pageToDiscard, false);
+        }
+        releasedLatches = true;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexBulkLoader.java
index 84857f4..168fcfd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexBulkLoader.java
@@ -20,7 +20,7 @@
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import org.apache.hyracks.storage.am.common.impls.AbstractTreeIndex.AbstractTreeIndexBulkLoader;
+import org.apache.hyracks.storage.am.common.impls.AbstractTreeIndexBulkLoader;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMTreeTupleWriter;
 import org.apache.hyracks.storage.common.IIndexBulkLoader;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
index 4942eda..e39c14e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
@@ -19,8 +19,6 @@
 
 package org.apache.hyracks.storage.am.lsm.invertedindex.ondisk;
 
-import java.io.DataOutput;
-import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.api.context.IHyracksCommonContext;
@@ -40,6 +38,7 @@
 import org.apache.hyracks.storage.am.btree.impls.DiskBTree;
 import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
 import org.apache.hyracks.storage.am.btree.util.BTreeUtils;
+import org.apache.hyracks.storage.am.common.api.IBulkLoadFinalizer;
 import org.apache.hyracks.storage.am.common.api.IIndexOperationContext;
 import org.apache.hyracks.storage.am.common.api.IPageManagerFactory;
 import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
@@ -58,9 +57,6 @@
 import org.apache.hyracks.storage.common.ISearchPredicate;
 import org.apache.hyracks.storage.common.MultiComparator;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
-import org.apache.hyracks.storage.common.buffercache.ICachedPage;
-import org.apache.hyracks.storage.common.buffercache.IFIFOPageQueue;
-import org.apache.hyracks.storage.common.file.BufferedFileHandle;
 
 /**
  * An inverted index consists of two files: 1. a file storing (paginated)
@@ -103,12 +99,13 @@
     protected final int numTokenFields;
     protected final int numInvListKeys;
     protected final FileReference invListsFile;
+    protected final IBulkLoadFinalizer<Void, Integer> maxPageIdSetter;
     // Last page id of inverted-lists file (inclusive). Set during bulk load.
     protected int invListsMaxPageId = -1;
     protected boolean isOpen = false;
     protected boolean wasOpen = false;
 
-    public OnDiskInvertedIndex(IBufferCache bufferCache, IInvertedListBuilder invListBuilder,
+    public OnDiskInvertedIndex( /*NOSONAR*/ IBufferCache bufferCache, IInvertedListBuilder invListBuilder,
             ITypeTraits[] invListTypeTraits, IBinaryComparatorFactory[] invListCmpFactories,
             ITypeTraits[] tokenTypeTraits, IBinaryComparatorFactory[] tokenCmpFactories, FileReference btreeFile,
             FileReference invListsFile, IPageManagerFactory pageManagerFactory) throws HyracksDataException {
@@ -127,6 +124,11 @@
         this.invListEndPageIdField = numTokenFields + 1;
         this.invListStartOffField = numTokenFields + 2;
         this.invListNumElementsField = numTokenFields + 3;
+
+        maxPageIdSetter = (Integer newMaxPageId) -> {
+            invListsMaxPageId = newMaxPageId;
+            return null;
+        };
     }
 
     @Override
@@ -221,168 +223,6 @@
         int numElements = IntegerPointable.getInteger(btreeTuple.getFieldData(invListNumElementsField),
                 btreeTuple.getFieldStart(invListNumElementsField));
         listCursor.reset(startPageId, endPageId, startOff, numElements);
-    }
-
-    public final class OnDiskInvertedIndexBulkLoader implements IIndexBulkLoader {
-        private final ArrayTupleBuilder btreeTupleBuilder;
-        private final ArrayTupleReference btreeTupleReference;
-        private final IIndexBulkLoader btreeBulkloader;
-
-        private int currentInvListStartPageId;
-        private int currentInvListStartOffset;
-        private final ArrayTupleBuilder lastTupleBuilder;
-        private final ArrayTupleReference lastTuple;
-
-        private int currentPageId;
-        private ICachedPage currentPage;
-        private final MultiComparator tokenCmp;
-        private final MultiComparator invListCmp;
-
-        private final boolean verifyInput;
-        private final MultiComparator allCmp;
-
-        private final IFIFOPageQueue queue;
-
-        public OnDiskInvertedIndexBulkLoader(float btreeFillFactor, boolean verifyInput, long numElementsHint,
-                boolean checkIfEmptyIndex, int startPageId) throws HyracksDataException {
-            this.verifyInput = verifyInput;
-            this.tokenCmp = MultiComparator.create(btree.getComparatorFactories());
-            this.invListCmp = MultiComparator.create(invListCmpFactories);
-            if (verifyInput) {
-                allCmp = MultiComparator.create(btree.getComparatorFactories(), invListCmpFactories);
-            } else {
-                allCmp = null;
-            }
-            this.btreeTupleBuilder = new ArrayTupleBuilder(btree.getFieldCount());
-            this.btreeTupleReference = new ArrayTupleReference();
-            this.lastTupleBuilder = new ArrayTupleBuilder(numTokenFields + numInvListKeys);
-            this.lastTuple = new ArrayTupleReference();
-            this.btreeBulkloader =
-                    btree.createBulkLoader(btreeFillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
-            currentPageId = startPageId;
-            currentPage = bufferCache.confiscatePage(BufferedFileHandle.getDiskPageId(fileId, currentPageId));
-            invListBuilder.setTargetBuffer(currentPage.getBuffer().array(), 0);
-            queue = bufferCache.createFIFOQueue();
-        }
-
-        public void pinNextPage() throws HyracksDataException {
-            queue.put(currentPage);
-            currentPageId++;
-            currentPage = bufferCache.confiscatePage(BufferedFileHandle.getDiskPageId(fileId, currentPageId));
-        }
-
-        private void createAndInsertBTreeTuple() throws HyracksDataException {
-            // Build tuple.
-            btreeTupleBuilder.reset();
-            DataOutput output = btreeTupleBuilder.getDataOutput();
-            // Add key fields.
-            lastTuple.reset(lastTupleBuilder.getFieldEndOffsets(), lastTupleBuilder.getByteArray());
-            for (int i = 0; i < numTokenFields; i++) {
-                btreeTupleBuilder.addField(lastTuple.getFieldData(i), lastTuple.getFieldStart(i),
-                        lastTuple.getFieldLength(i));
-            }
-            // Add inverted-list 'pointer' value fields.
-            try {
-                output.writeInt(currentInvListStartPageId);
-                btreeTupleBuilder.addFieldEndOffset();
-                output.writeInt(currentPageId);
-                btreeTupleBuilder.addFieldEndOffset();
-                output.writeInt(currentInvListStartOffset);
-                btreeTupleBuilder.addFieldEndOffset();
-                output.writeInt(invListBuilder.getListSize());
-                btreeTupleBuilder.addFieldEndOffset();
-            } catch (IOException e) {
-                throw HyracksDataException.create(e);
-            }
-            // Reset tuple reference and add it into the BTree load.
-            btreeTupleReference.reset(btreeTupleBuilder.getFieldEndOffsets(), btreeTupleBuilder.getByteArray());
-            btreeBulkloader.add(btreeTupleReference);
-        }
-
-        /**
-         * Assumptions:
-         * The first btree.getMultiComparator().getKeyFieldCount() fields in tuple
-         * are btree keys (e.g., a string token).
-         * The next invListCmp.getKeyFieldCount() fields in tuple are keys of the
-         * inverted list (e.g., primary key).
-         * Key fields of inverted list are fixed size.
-         */
-        @Override
-        public void add(ITupleReference tuple) throws HyracksDataException {
-            boolean firstElement = lastTupleBuilder.getSize() == 0;
-            boolean startNewList = firstElement;
-            if (!firstElement) {
-                // If the current and the last token don't match, we start a new list.
-                lastTuple.reset(lastTupleBuilder.getFieldEndOffsets(), lastTupleBuilder.getByteArray());
-                startNewList = tokenCmp.compare(tuple, lastTuple) != 0;
-            }
-            if (startNewList) {
-                if (!firstElement) {
-                    // Create entry in btree for last inverted list.
-                    createAndInsertBTreeTuple();
-                }
-                if (!invListBuilder.startNewList(tuple, numTokenFields)) {
-                    pinNextPage();
-                    invListBuilder.setTargetBuffer(currentPage.getBuffer().array(), 0);
-                    if (!invListBuilder.startNewList(tuple, numTokenFields)) {
-                        throw new IllegalStateException("Failed to create first inverted list.");
-                    }
-                }
-                currentInvListStartPageId = currentPageId;
-                currentInvListStartOffset = invListBuilder.getPos();
-            } else {
-                if (invListCmp.compare(tuple, lastTuple, numTokenFields) == 0) {
-                    // Duplicate inverted-list element.
-                    return;
-                }
-            }
-
-            // Append to current inverted list.
-            if (!invListBuilder.appendElement(tuple, numTokenFields, numInvListKeys)) {
-                pinNextPage();
-                invListBuilder.setTargetBuffer(currentPage.getBuffer().array(), 0);
-                if (!invListBuilder.appendElement(tuple, numTokenFields, numInvListKeys)) {
-                    throw new IllegalStateException(
-                            "Failed to append element to inverted list after switching to a new page.");
-                }
-            }
-
-            if (verifyInput && lastTupleBuilder.getSize() != 0) {
-                if (allCmp.compare(tuple, lastTuple) <= 0) {
-                    throw new HyracksDataException(
-                            "Input stream given to OnDiskInvertedIndex bulk load is not sorted.");
-                }
-            }
-
-            // Remember last tuple by creating a copy.
-            // TODO: This portion can be optimized by only copying the token when it changes, and using the last appended inverted-list element as a reference.
-            lastTupleBuilder.reset();
-            for (int i = 0; i < tuple.getFieldCount(); i++) {
-                lastTupleBuilder.addField(tuple.getFieldData(i), tuple.getFieldStart(i), tuple.getFieldLength(i));
-            }
-        }
-
-        @Override
-        public void end() throws HyracksDataException {
-            // The last tuple builder is empty if add() was never called.
-            if (lastTupleBuilder.getSize() != 0) {
-                createAndInsertBTreeTuple();
-            }
-            btreeBulkloader.end();
-
-            if (currentPage != null) {
-                queue.put(currentPage);
-            }
-            invListsMaxPageId = currentPageId;
-            bufferCache.finishQueue();
-        }
-
-        @Override
-        public void abort() throws HyracksDataException {
-            if (btreeBulkloader != null) {
-                btreeBulkloader.abort();
-            }
-        }
     }
 
     @Override
@@ -533,8 +373,8 @@
     @Override
     public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint,
             boolean checkIfEmptyIndex) throws HyracksDataException {
-        return new OnDiskInvertedIndexBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
-                rootPageId);
+        return new OnDiskInvertedIndexBulkLoader(this, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
+                rootPageId, maxPageIdSetter);
     }
 
     @Override
@@ -636,4 +476,15 @@
         bufferCache.purgeHandle(fileId);
         fileId = -1;
     }
+
+    /**
+     * @return The file id of this index.
+     */
+    public int getFileId() {
+        return fileId;
+    }
+
+    public IInvertedListBuilder getInvertedListBuilder() {
+        return invListBuilder;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexBulkLoader.java
new file mode 100644
index 0000000..0469149
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexBulkLoader.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.invertedindex.ondisk;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.btree.impls.BTree;
+import org.apache.hyracks.storage.am.common.api.IBulkLoadFinalizer;
+import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedListBuilder;
+import org.apache.hyracks.storage.common.IIndexBulkLoader;
+import org.apache.hyracks.storage.common.MultiComparator;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.IFIFOPageQueue;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+
+/**
+ * TODO Clean:
+ * - Remove ArrayTupleBuilder
+ * - Use IFrameFieldAppender --> FrameFixedFieldAppender
+ * - Pass a frame (Frame size assumptions ?)
+ *
+ */
+public final class OnDiskInvertedIndexBulkLoader implements IIndexBulkLoader {
+    private final ArrayTupleBuilder btreeTupleBuilder;
+    private final ArrayTupleReference btreeTupleReference;
+    private final IIndexBulkLoader btreeBulkloader;
+
+    private int currentInvListStartPageId;
+    private int currentInvListStartOffset;
+    private final ArrayTupleBuilder lastTupleBuilder;
+    private final ArrayTupleReference lastTuple;
+
+    private int currentPageId;
+    private ICachedPage currentPage;
+    private final MultiComparator tokenCmp;
+    private final MultiComparator invListCmp;
+
+    private final boolean verifyInput;
+    private final MultiComparator allCmp;
+
+    private final IFIFOPageQueue queue;
+    private final IBufferCache bufferCache;
+
+    private final int fileId;
+    private final int numTokenFields;
+    private final int numInvListKeys;
+    private final IInvertedListBuilder invListBuilder;
+    private final IBulkLoadFinalizer<Void, Integer> finalizer;
+
+    public OnDiskInvertedIndexBulkLoader(OnDiskInvertedIndex index, float btreeFillFactor, boolean verifyInput,
+            long numElementsHint, boolean checkIfEmptyIndex, int startPageId,
+            IBulkLoadFinalizer<Void, Integer> finalizer) throws HyracksDataException {
+        final BTree btree = index.getBTree();
+        this.fileId = index.getFileId();
+        this.bufferCache = index.getBufferCache();
+        this.verifyInput = verifyInput;
+        this.tokenCmp = MultiComparator.create(btree.getComparatorFactories());
+        this.invListCmp = MultiComparator.create(index.getInvListCmpFactories());
+        this.numInvListKeys = index.getInvListCmpFactories().length;
+        this.numTokenFields = btree.getComparatorFactories().length;
+        if (verifyInput) {
+            allCmp = MultiComparator.create(btree.getComparatorFactories(), index.getInvListCmpFactories());
+        } else {
+            allCmp = null;
+        }
+        this.btreeTupleBuilder = new ArrayTupleBuilder(btree.getFieldCount());
+        this.btreeTupleReference = new ArrayTupleReference();
+        this.lastTupleBuilder = new ArrayTupleBuilder(numTokenFields + numInvListKeys);
+        this.lastTuple = new ArrayTupleReference();
+        this.btreeBulkloader = btree.createBulkLoader(btreeFillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
+        currentPageId = startPageId;
+        currentPage = bufferCache.confiscatePage(BufferedFileHandle.getDiskPageId(fileId, currentPageId));
+        invListBuilder = index.getInvertedListBuilder();
+        invListBuilder.setTargetBuffer(currentPage.getBuffer().array(), 0);
+        queue = bufferCache.createFIFOQueue();
+        this.finalizer = finalizer;
+    }
+
+    public void pinNextPage() throws HyracksDataException {
+        queue.put(currentPage);
+        currentPageId++;
+        currentPage = bufferCache.confiscatePage(BufferedFileHandle.getDiskPageId(fileId, currentPageId));
+    }
+
+    private void createAndInsertBTreeTuple() throws HyracksDataException {
+        // Build tuple.
+        btreeTupleBuilder.reset();
+        DataOutput output = btreeTupleBuilder.getDataOutput();
+        // Add key fields.
+        lastTuple.reset(lastTupleBuilder.getFieldEndOffsets(), lastTupleBuilder.getByteArray());
+        for (int i = 0; i < numTokenFields; i++) {
+            btreeTupleBuilder.addField(lastTuple.getFieldData(i), lastTuple.getFieldStart(i),
+                    lastTuple.getFieldLength(i));
+        }
+        // Add inverted-list 'pointer' value fields.
+        try {
+            output.writeInt(currentInvListStartPageId);
+            btreeTupleBuilder.addFieldEndOffset();
+            output.writeInt(currentPageId);
+            btreeTupleBuilder.addFieldEndOffset();
+            output.writeInt(currentInvListStartOffset);
+            btreeTupleBuilder.addFieldEndOffset();
+            output.writeInt(invListBuilder.getListSize());
+            btreeTupleBuilder.addFieldEndOffset();
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+        // Reset tuple reference and add it into the BTree load.
+        btreeTupleReference.reset(btreeTupleBuilder.getFieldEndOffsets(), btreeTupleBuilder.getByteArray());
+        btreeBulkloader.add(btreeTupleReference);
+    }
+
+    /**
+     * Assumptions:
+     * The first btree.getMultiComparator().getKeyFieldCount() fields in tuple
+     * are btree keys (e.g., a string token).
+     * The next invListCmp.getKeyFieldCount() fields in tuple are keys of the
+     * inverted list (e.g., primary key).
+     * Key fields of inverted list are fixed size.
+     */
+    @Override
+    public void add(ITupleReference tuple) throws HyracksDataException {
+        boolean firstElement = lastTupleBuilder.getSize() == 0;
+        boolean startNewList = firstElement;
+        if (!firstElement) {
+            // If the current and the last token don't match, we start a new list.
+            lastTuple.reset(lastTupleBuilder.getFieldEndOffsets(), lastTupleBuilder.getByteArray());
+            startNewList = tokenCmp.compare(tuple, lastTuple) != 0;
+        }
+        if (startNewList) {
+            if (!firstElement) {
+                // Create entry in btree for last inverted list.
+                createAndInsertBTreeTuple();
+            }
+            if (!invListBuilder.startNewList(tuple, numTokenFields)) {
+                pinNextPage();
+                invListBuilder.setTargetBuffer(currentPage.getBuffer().array(), 0);
+                if (!invListBuilder.startNewList(tuple, numTokenFields)) {
+                    throw new IllegalStateException("Failed to create first inverted list.");
+                }
+            }
+            currentInvListStartPageId = currentPageId;
+            currentInvListStartOffset = invListBuilder.getPos();
+        } else {
+            if (invListCmp.compare(tuple, lastTuple, numTokenFields) == 0) {
+                // Duplicate inverted-list element.
+                return;
+            }
+        }
+
+        // Append to current inverted list.
+        if (!invListBuilder.appendElement(tuple, numTokenFields, numInvListKeys)) {
+            pinNextPage();
+            invListBuilder.setTargetBuffer(currentPage.getBuffer().array(), 0);
+            if (!invListBuilder.appendElement(tuple, numTokenFields, numInvListKeys)) {
+                throw new IllegalStateException(
+                        "Failed to append element to inverted list after switching to a new page.");
+            }
+        }
+
+        if (verifyInput && lastTupleBuilder.getSize() != 0) {
+            if (allCmp.compare(tuple, lastTuple) <= 0) {
+                throw new HyracksDataException("Input stream given to OnDiskInvertedIndex bulk load is not sorted.");
+            }
+        }
+
+        // Remember last tuple by creating a copy.
+        // TODO: This portion can be optimized by only copying the token when it changes, and using the last appended inverted-list element as a reference.
+        lastTupleBuilder.reset();
+        for (int i = 0; i < tuple.getFieldCount(); i++) {
+            lastTupleBuilder.addField(tuple.getFieldData(i), tuple.getFieldStart(i), tuple.getFieldLength(i));
+        }
+    }
+
+    @Override
+    public void end() throws HyracksDataException {
+        // The last tuple builder is empty if add() was never called.
+        if (lastTupleBuilder.getSize() != 0) {
+            createAndInsertBTreeTuple();
+        }
+        btreeBulkloader.end();
+
+        if (currentPage != null) {
+            queue.put(currentPage);
+        }
+        finalizer.finalizeBulkLoad(currentPageId);
+        bufferCache.finishQueue();
+    }
+
+    @Override
+    public void abort() throws HyracksDataException {
+        if (btreeBulkloader != null) {
+            btreeBulkloader.abort();
+        }
+
+        if (currentPage != null && currentPage.confiscated()) {
+            bufferCache.returnPage(currentPage);
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java
index 01e0684..7d6a2d9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java
@@ -40,7 +40,7 @@
 
 public class PartitionedOnDiskInvertedIndex extends OnDiskInvertedIndex implements IPartitionedInvertedIndex {
 
-    protected final int PARTITIONING_NUM_TOKENS_FIELD = 1;
+    protected static final int PARTITIONING_NUM_TOKENS_FIELD = 1;
 
     public PartitionedOnDiskInvertedIndex(IBufferCache bufferCache, IInvertedListBuilder invListBuilder,
             ITypeTraits[] invListTypeTraits, IBinaryComparatorFactory[] invListCmpFactories,
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java
index f29bffc..1bb7562 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java
@@ -30,7 +30,7 @@
 import org.apache.hyracks.storage.am.bloomfilter.impls.BloomCalculations;
 import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
 import org.apache.hyracks.storage.am.btree.impls.BTree;
-import org.apache.hyracks.storage.am.btree.impls.BTree.BTreeBulkLoader;
+import org.apache.hyracks.storage.am.btree.impls.BTreeBulkLoader;
 import org.apache.hyracks.storage.am.common.api.IIndexOperationContext;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import org.apache.hyracks.storage.am.common.api.ITwoPCIndexBulkLoader;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
index 1e71b7f..2238c06 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
@@ -19,9 +19,7 @@
 
 package org.apache.hyracks.storage.am.rtree.impls;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -36,20 +34,15 @@
 import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
-import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleReference;
-import org.apache.hyracks.storage.am.common.frames.AbstractSlotManager;
 import org.apache.hyracks.storage.am.common.frames.FrameOpSpaceStatus;
 import org.apache.hyracks.storage.am.common.impls.AbstractTreeIndex;
-import org.apache.hyracks.storage.am.common.impls.NodeFrontier;
 import org.apache.hyracks.storage.am.common.impls.TreeIndexDiskOrderScanCursor;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.common.util.TreeIndexUtils;
 import org.apache.hyracks.storage.am.rtree.api.IRTreeFrame;
 import org.apache.hyracks.storage.am.rtree.api.IRTreeInteriorFrame;
 import org.apache.hyracks.storage.am.rtree.api.IRTreeLeafFrame;
-import org.apache.hyracks.storage.am.rtree.frames.RTreeNSMFrame;
 import org.apache.hyracks.storage.am.rtree.frames.RTreeNSMInteriorFrame;
-import org.apache.hyracks.storage.am.rtree.tuples.RTreeTypeAwareTupleWriter;
 import org.apache.hyracks.storage.common.IIndexAccessParameters;
 import org.apache.hyracks.storage.common.IIndexBulkLoader;
 import org.apache.hyracks.storage.common.IIndexCursor;
@@ -57,7 +50,6 @@
 import org.apache.hyracks.storage.common.ISearchOperationCallback;
 import org.apache.hyracks.storage.common.ISearchPredicate;
 import org.apache.hyracks.storage.common.MultiComparator;
-import org.apache.hyracks.storage.common.buffercache.BufferCache;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
@@ -901,210 +893,7 @@
     public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint,
             boolean checkIfEmptyIndex) throws HyracksDataException {
         // TODO: verifyInput currently does nothing.
-        return new RTreeBulkLoader(fillFactor);
-    }
-
-    public class RTreeBulkLoader extends AbstractTreeIndex.AbstractTreeIndexBulkLoader {
-        ITreeIndexFrame lowerFrame, prevInteriorFrame;
-        RTreeTypeAwareTupleWriter interiorFrameTupleWriter =
-                ((RTreeTypeAwareTupleWriter) interiorFrame.getTupleWriter());
-        ITreeIndexTupleReference mbrTuple = interiorFrame.createTupleReference();
-        ByteBuffer mbr;
-        List<Integer> prevNodeFrontierPages = new ArrayList<>();
-
-        public RTreeBulkLoader(float fillFactor) throws HyracksDataException {
-            super(fillFactor);
-            prevInteriorFrame = interiorFrameFactory.createFrame();
-        }
-
-        @Override
-        public void add(ITupleReference tuple) throws HyracksDataException {
-            try {
-                int leafFrameTupleSize = leafFrame.getBytesRequiredToWriteTuple(tuple);
-                int interiorFrameTupleSize = interiorFrame.getBytesRequiredToWriteTuple(tuple);
-                int tupleSize = Math.max(leafFrameTupleSize, interiorFrameTupleSize);
-                if (tupleSize > maxTupleSize) {
-                    throw HyracksDataException.create(ErrorCode.RECORD_IS_TOO_LARGE, tupleSize, maxTupleSize);
-                }
-
-                NodeFrontier leafFrontier = nodeFrontiers.get(0);
-
-                int spaceNeeded = leafFrameTupleSize;
-                int spaceUsed = leafFrame.getBuffer().capacity() - leafFrame.getTotalFreeSpace();
-
-                // try to free space by compression
-                if (spaceUsed + spaceNeeded > leafMaxBytes) {
-                    leafFrame.compress();
-                    spaceUsed = leafFrame.getBuffer().capacity() - leafFrame.getTotalFreeSpace();
-                }
-
-                if (spaceUsed + spaceNeeded > leafMaxBytes) {
-
-                    if (prevNodeFrontierPages.size() == 0) {
-                        prevNodeFrontierPages.add(leafFrontier.pageId);
-                    } else {
-                        prevNodeFrontierPages.set(0, leafFrontier.pageId);
-                    }
-                    propagateBulk(1, false, pagesToWrite);
-
-                    leafFrontier.pageId = freePageManager.takePage(metaFrame);
-                    queue.put(leafFrontier.page);
-                    for (ICachedPage c : pagesToWrite) {
-                        queue.put(c);
-                    }
-
-                    pagesToWrite.clear();
-                    leafFrontier.page = bufferCache
-                            .confiscatePage(BufferedFileHandle.getDiskPageId(getFileId(), leafFrontier.pageId));
-                    leafFrame.setPage(leafFrontier.page);
-                    leafFrame.initBuffer((byte) 0);
-
-                }
-
-                leafFrame.setPage(leafFrontier.page);
-                leafFrame.insert(tuple, AbstractSlotManager.GREATEST_KEY_INDICATOR);
-            } catch (HyracksDataException e) {
-                handleException();
-                throw e;
-            } catch (RuntimeException e) {
-                handleException();
-                throw e;
-            }
-
-        }
-
-        @Override
-        public void end() throws HyracksDataException {
-            pagesToWrite.clear();
-            //if writing a trivial 1-page tree, don't try and propagate up
-            if (nodeFrontiers.size() > 1) {
-                propagateBulk(1, true, pagesToWrite);
-            }
-
-            for (ICachedPage c : pagesToWrite) {
-                queue.put(c);
-            }
-            finish();
-            super.end();
-        }
-
-        @Override
-        public void abort() throws HyracksDataException {
-            super.handleException();
-        }
-
-        protected void finish() throws HyracksDataException {
-            int prevPageId = -1;
-            //here we assign physical identifiers to everything we can
-            for (NodeFrontier n : nodeFrontiers) {
-                //not a leaf
-                if (nodeFrontiers.indexOf(n) != 0) {
-                    interiorFrame.setPage(n.page);
-                    mbrTuple.resetByTupleOffset(mbr.array(), 0);
-                    interiorFrame.insert(mbrTuple, -1);
-                    interiorFrame.getBuffer().putInt(
-                            interiorFrame.getTupleOffset(interiorFrame.getTupleCount() - 1) + mbrTuple.getTupleSize(),
-                            prevPageId);
-
-                    int finalPageId = freePageManager.takePage(metaFrame);
-                    n.pageId = finalPageId;
-                    n.page.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(), finalPageId));
-                    //else we are looking at a leaf
-                }
-                //set next guide MBR
-                //if propagateBulk didnt have to do anything this may be un-necessary
-                if (nodeFrontiers.size() > 1 && nodeFrontiers.indexOf(n) < nodeFrontiers.size() - 1) {
-                    lowerFrame.setPage(n.page);
-                    ((RTreeNSMFrame) lowerFrame).adjustMBR();
-                    interiorFrameTupleWriter.writeTupleFields(((RTreeNSMFrame) lowerFrame).getMBRTuples(), 0, mbr, 0);
-                }
-                queue.put(n.page);
-                n.page = null;
-                prevPageId = n.pageId;
-            }
-            rootPage = nodeFrontiers.get(nodeFrontiers.size() - 1).pageId;
-            releasedLatches = true;
-        }
-
-        protected void propagateBulk(int level, boolean toRoot, List<ICachedPage> pagesToWrite)
-                throws HyracksDataException {
-            boolean propagated = false;
-
-            if (level == 1) {
-                lowerFrame = leafFrame;
-            }
-
-            if (lowerFrame.getTupleCount() == 0) {
-                return;
-            }
-
-            if (level >= nodeFrontiers.size()) {
-                addLevel();
-            }
-
-            //adjust the tuple pointers of the lower frame to allow us to calculate our MBR
-            //if this is a leaf, then there is only one tuple, so this is trivial
-            ((RTreeNSMFrame) lowerFrame).adjustMBR();
-
-            if (mbr == null) {
-                int bytesRequired =
-                        interiorFrameTupleWriter.bytesRequired(((RTreeNSMFrame) lowerFrame).getMBRTuples()[0], 0,
-                                cmp.getKeyFieldCount()) + ((RTreeNSMInteriorFrame) interiorFrame).getChildPointerSize();
-                mbr = ByteBuffer.allocate(bytesRequired);
-            }
-            interiorFrameTupleWriter.writeTupleFields(((RTreeNSMFrame) lowerFrame).getMBRTuples(), 0, mbr, 0);
-            mbrTuple.resetByTupleOffset(mbr.array(), 0);
-
-            NodeFrontier frontier = nodeFrontiers.get(level);
-            interiorFrame.setPage(frontier.page);
-            //see if we have space for two tuples. this works around a  tricky boundary condition with sequential bulk
-            // load where finalization can possibly lead to a split
-            //TODO: accomplish this without wasting 1 tuple
-            int sizeOfTwoTuples = 2 * (mbrTuple.getTupleSize() + RTreeNSMInteriorFrame.childPtrSize);
-            FrameOpSpaceStatus spaceForTwoTuples =
-                    (((RTreeNSMInteriorFrame) interiorFrame).hasSpaceInsert(sizeOfTwoTuples));
-            if (spaceForTwoTuples != FrameOpSpaceStatus.SUFFICIENT_CONTIGUOUS_SPACE && !toRoot) {
-
-                int finalPageId = freePageManager.takePage(metaFrame);
-                if (prevNodeFrontierPages.size() <= level) {
-                    prevNodeFrontierPages.add(finalPageId);
-                } else {
-                    prevNodeFrontierPages.set(level, finalPageId);
-                }
-                frontier.page.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(), finalPageId));
-                pagesToWrite.add(frontier.page);
-                lowerFrame = prevInteriorFrame;
-                lowerFrame.setPage(frontier.page);
-
-                frontier.page = bufferCache.confiscatePage(BufferCache.INVALID_DPID);
-                interiorFrame.setPage(frontier.page);
-                interiorFrame.initBuffer((byte) level);
-
-                interiorFrame.insert(mbrTuple, AbstractSlotManager.GREATEST_KEY_INDICATOR);
-
-                interiorFrame.getBuffer().putInt(
-                        interiorFrame.getTupleOffset(interiorFrame.getTupleCount() - 1) + mbrTuple.getTupleSize(),
-                        prevNodeFrontierPages.get(level - 1));
-
-                propagateBulk(level + 1, toRoot, pagesToWrite);
-            } else if (interiorFrame.hasSpaceInsert(mbrTuple) == FrameOpSpaceStatus.SUFFICIENT_CONTIGUOUS_SPACE
-                    && !toRoot) {
-
-                interiorFrame.insert(mbrTuple, -1);
-
-                interiorFrame.getBuffer().putInt(
-                        interiorFrame.getTupleOffset(interiorFrame.getTupleCount() - 1) + mbrTuple.getTupleSize(),
-                        prevNodeFrontierPages.get(level - 1));
-            }
-
-            if (toRoot && level < nodeFrontiers.size() - 1) {
-                lowerFrame = prevInteriorFrame;
-                lowerFrame.setPage(frontier.page);
-                propagateBulk(level + 1, true, pagesToWrite);
-            }
-
-            leafFrame.setPage(nodeFrontiers.get(0).page);
-        }
+        return new RTreeBulkLoader(this, fillFactor, maxTupleSize, rootPageSetter);
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTreeBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTreeBulkLoader.java
new file mode 100644
index 0000000..8fa693c
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTreeBulkLoader.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.rtree.impls;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.IBulkLoadFinalizer;
+import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
+import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleReference;
+import org.apache.hyracks.storage.am.common.frames.AbstractSlotManager;
+import org.apache.hyracks.storage.am.common.frames.FrameOpSpaceStatus;
+import org.apache.hyracks.storage.am.common.impls.AbstractTreeIndexBulkLoader;
+import org.apache.hyracks.storage.am.common.impls.NodeFrontier;
+import org.apache.hyracks.storage.am.rtree.frames.RTreeNSMFrame;
+import org.apache.hyracks.storage.am.rtree.frames.RTreeNSMInteriorFrame;
+import org.apache.hyracks.storage.am.rtree.tuples.RTreeTypeAwareTupleWriter;
+import org.apache.hyracks.storage.common.buffercache.BufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+
+public class RTreeBulkLoader extends AbstractTreeIndexBulkLoader {
+
+    private final ITreeIndexFrame prevInteriorFrame;
+    private ITreeIndexFrame lowerFrame;
+    private final RTreeTypeAwareTupleWriter interiorFrameTupleWriter;
+    private final ITreeIndexTupleReference mbrTuple;
+    private final List<Integer> prevNodeFrontierPages = new ArrayList<>();
+    private ByteBuffer mbr;
+
+    public RTreeBulkLoader(RTree index, float fillFactor, int maxTupleSize, IBulkLoadFinalizer<Void, Integer> finalizer)
+            throws HyracksDataException {
+        super(index, fillFactor, maxTupleSize, finalizer);
+        prevInteriorFrame = index.getInteriorFrameFactory().createFrame();
+        interiorFrameTupleWriter = (RTreeTypeAwareTupleWriter) interiorFrame.getTupleWriter();
+        mbrTuple = interiorFrame.createTupleReference();
+    }
+
+    @Override
+    public void add(ITupleReference tuple) throws HyracksDataException {
+        try {
+            int leafFrameTupleSize = leafFrame.getBytesRequiredToWriteTuple(tuple);
+            int interiorFrameTupleSize = interiorFrame.getBytesRequiredToWriteTuple(tuple);
+            int tupleSize = Math.max(leafFrameTupleSize, interiorFrameTupleSize);
+            if (tupleSize > maxTupleSize) {
+                throw HyracksDataException.create(ErrorCode.RECORD_IS_TOO_LARGE, tupleSize, maxTupleSize);
+            }
+
+            NodeFrontier leafFrontier = nodeFrontiers.get(0);
+
+            int spaceNeeded = leafFrameTupleSize;
+            int spaceUsed = leafFrame.getBuffer().capacity() - leafFrame.getTotalFreeSpace();
+
+            // try to free space by compression
+            if (spaceUsed + spaceNeeded > leafMaxBytes) {
+                leafFrame.compress();
+                spaceUsed = leafFrame.getBuffer().capacity() - leafFrame.getTotalFreeSpace();
+            }
+
+            if (spaceUsed + spaceNeeded > leafMaxBytes) {
+
+                if (prevNodeFrontierPages.isEmpty()) {
+                    prevNodeFrontierPages.add(leafFrontier.pageId);
+                } else {
+                    prevNodeFrontierPages.set(0, leafFrontier.pageId);
+                }
+                propagateBulk(1, false, pagesToWrite);
+
+                leafFrontier.pageId = freePageManager.takePage(metaFrame);
+                queue.put(leafFrontier.page);
+                for (ICachedPage c : pagesToWrite) {
+                    queue.put(c);
+                }
+
+                pagesToWrite.clear();
+                leafFrontier.page =
+                        bufferCache.confiscatePage(BufferedFileHandle.getDiskPageId(fileId, leafFrontier.pageId));
+                leafFrame.setPage(leafFrontier.page);
+                leafFrame.initBuffer((byte) 0);
+
+            }
+
+            leafFrame.setPage(leafFrontier.page);
+            leafFrame.insert(tuple, AbstractSlotManager.GREATEST_KEY_INDICATOR);
+        } catch (HyracksDataException e) {
+            cleanUp();
+            throw e;
+        } catch (RuntimeException e) {
+            /**
+             * Need to check who throws RuntimeException?
+             */
+            cleanUp();
+            throw e;
+        }
+
+    }
+
+    @Override
+    public void end() throws HyracksDataException {
+        pagesToWrite.clear();
+        //if writing a trivial 1-page tree, don't try and propagate up
+        if (nodeFrontiers.size() > 1) {
+            propagateBulk(1, true, pagesToWrite);
+        }
+
+        for (ICachedPage c : pagesToWrite) {
+            queue.put(c);
+        }
+        finish();
+        super.end();
+    }
+
+    protected void finish() throws HyracksDataException {
+        int prevPageId = -1;
+        //here we assign physical identifiers to everything we can
+        for (NodeFrontier n : nodeFrontiers) {
+            //not a leaf
+            if (nodeFrontiers.indexOf(n) != 0) {
+                interiorFrame.setPage(n.page);
+                mbrTuple.resetByTupleOffset(mbr.array(), 0);
+                interiorFrame.insert(mbrTuple, -1);
+                interiorFrame.getBuffer().putInt(
+                        interiorFrame.getTupleOffset(interiorFrame.getTupleCount() - 1) + mbrTuple.getTupleSize(),
+                        prevPageId);
+
+                int finalPageId = freePageManager.takePage(metaFrame);
+                n.pageId = finalPageId;
+                n.page.setDiskPageId(BufferedFileHandle.getDiskPageId(fileId, finalPageId));
+                //else we are looking at a leaf
+            }
+            //set next guide MBR
+            //if propagateBulk didnt have to do anything this may be un-necessary
+            if (nodeFrontiers.size() > 1 && nodeFrontiers.indexOf(n) < nodeFrontiers.size() - 1) {
+                lowerFrame.setPage(n.page);
+                ((RTreeNSMFrame) lowerFrame).adjustMBR();
+                interiorFrameTupleWriter.writeTupleFields(((RTreeNSMFrame) lowerFrame).getMBRTuples(), 0, mbr, 0);
+            }
+            queue.put(n.page);
+            n.page = null;
+            prevPageId = n.pageId;
+        }
+        rootPage = nodeFrontiers.get(nodeFrontiers.size() - 1).pageId;
+        releasedLatches = true;
+    }
+
+    protected void propagateBulk(int level, boolean toRoot, List<ICachedPage> pagesToWrite)
+            throws HyracksDataException {
+
+        if (level == 1) {
+            lowerFrame = leafFrame;
+        }
+
+        if (lowerFrame.getTupleCount() == 0) {
+            return;
+        }
+
+        if (level >= nodeFrontiers.size()) {
+            addLevel();
+        }
+
+        //adjust the tuple pointers of the lower frame to allow us to calculate our MBR
+        //if this is a leaf, then there is only one tuple, so this is trivial
+        ((RTreeNSMFrame) lowerFrame).adjustMBR();
+
+        if (mbr == null) {
+            int bytesRequired = interiorFrameTupleWriter.bytesRequired(((RTreeNSMFrame) lowerFrame).getMBRTuples()[0],
+                    0, cmp.getKeyFieldCount()) + ((RTreeNSMInteriorFrame) interiorFrame).getChildPointerSize();
+            mbr = ByteBuffer.allocate(bytesRequired);
+        }
+        interiorFrameTupleWriter.writeTupleFields(((RTreeNSMFrame) lowerFrame).getMBRTuples(), 0, mbr, 0);
+        mbrTuple.resetByTupleOffset(mbr.array(), 0);
+
+        NodeFrontier frontier = nodeFrontiers.get(level);
+        interiorFrame.setPage(frontier.page);
+        //see if we have space for two tuples. this works around a  tricky boundary condition with sequential bulk
+        // load where finalization can possibly lead to a split
+        //TODO: accomplish this without wasting 1 tuple
+        int sizeOfTwoTuples = 2 * (mbrTuple.getTupleSize() + RTreeNSMInteriorFrame.childPtrSize);
+        FrameOpSpaceStatus spaceForTwoTuples = ((RTreeNSMInteriorFrame) interiorFrame).hasSpaceInsert(sizeOfTwoTuples);
+        if (spaceForTwoTuples != FrameOpSpaceStatus.SUFFICIENT_CONTIGUOUS_SPACE && !toRoot) {
+
+            int finalPageId = freePageManager.takePage(metaFrame);
+            if (prevNodeFrontierPages.size() <= level) {
+                prevNodeFrontierPages.add(finalPageId);
+            } else {
+                prevNodeFrontierPages.set(level, finalPageId);
+            }
+            frontier.page.setDiskPageId(BufferedFileHandle.getDiskPageId(fileId, finalPageId));
+            pagesToWrite.add(frontier.page);
+            lowerFrame = prevInteriorFrame;
+            lowerFrame.setPage(frontier.page);
+
+            frontier.page = bufferCache.confiscatePage(BufferCache.INVALID_DPID);
+            interiorFrame.setPage(frontier.page);
+            interiorFrame.initBuffer((byte) level);
+
+            interiorFrame.insert(mbrTuple, AbstractSlotManager.GREATEST_KEY_INDICATOR);
+
+            interiorFrame.getBuffer().putInt(
+                    interiorFrame.getTupleOffset(interiorFrame.getTupleCount() - 1) + mbrTuple.getTupleSize(),
+                    prevNodeFrontierPages.get(level - 1));
+
+            propagateBulk(level + 1, toRoot, pagesToWrite);
+        } else if (interiorFrame.hasSpaceInsert(mbrTuple) == FrameOpSpaceStatus.SUFFICIENT_CONTIGUOUS_SPACE
+                && !toRoot) {
+
+            interiorFrame.insert(mbrTuple, -1);
+
+            interiorFrame.getBuffer().putInt(
+                    interiorFrame.getTupleOffset(interiorFrame.getTupleCount() - 1) + mbrTuple.getTupleSize(),
+                    prevNodeFrontierPages.get(level - 1));
+        }
+
+        if (toRoot && level < nodeFrontiers.size() - 1) {
+            lowerFrame = prevInteriorFrame;
+            lowerFrame.setPage(frontier.page);
+            propagateBulk(level + 1, true, pagesToWrite);
+        }
+
+        leafFrame.setPage(nodeFrontiers.get(0).page);
+    }
+}

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2367
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I7f42a391a4de4b02acf6a8fdaf2b60818c1da806
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Wail Alkowaileet <wael.y.k@gmail.com>


Mime
View raw message