asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From il...@apache.org
Subject [4/6] asterixdb git commit: [NO ISSUE][STO][IDX] LSM storage cleanup
Date Mon, 16 Oct 2017 06:08:04 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77f89525/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
index 18121e0..13543e4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
@@ -65,7 +65,7 @@ public interface ILSMMemoryComponent extends ILSMComponent {
     /**
      * request the component to be active
      */
-    void activate();
+    void requestActivation();
 
     /**
      * Set the component state
@@ -74,4 +74,31 @@ public interface ILSMMemoryComponent extends ILSMComponent {
      *            the new state
      */
     void setState(ComponentState state);
+
+    /**
+     * Allocates memory to this component, create and activate it
+     *
+     * @throws HyracksDataException
+     */
+    void allocate() throws HyracksDataException;
+
+    /**
+     * Deactivete the memory component, destroy it, and deallocates its memory
+     *
+     * @throws HyracksDataException
+     */
+    void deallocate() throws HyracksDataException;
+
+    /**
+     * Test method
+     * TODO: Get rid of it
+     *
+     * @throws HyracksDataException
+     */
+    void validate() throws HyracksDataException;
+
+    /**
+     * @return the size of the memory component
+     */
+    long getSize();
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77f89525/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
index 4386d52..280cc52 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
@@ -25,18 +25,27 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
+import org.apache.hyracks.storage.common.MultiComparator;
 
 public abstract class AbstractLSMDiskComponent extends AbstractLSMComponent implements ILSMDiskComponent {
 
     private final DiskComponentMetadata metadata;
+    private final AbstractLSMIndex lsmIndex;
 
-    public AbstractLSMDiskComponent(IMetadataPageManager mdPageManager, ILSMComponentFilter filter) {
+    public AbstractLSMDiskComponent(AbstractLSMIndex lsmIndex, IMetadataPageManager mdPageManager,
+            ILSMComponentFilter filter) {
         super(filter);
+        this.lsmIndex = lsmIndex;
         state = ComponentState.READABLE_UNWRITABLE;
         metadata = new DiskComponentMetadata(mdPageManager);
     }
 
     @Override
+    public AbstractLSMIndex getLsmIndex() {
+        return lsmIndex;
+    }
+
+    @Override
     public boolean threadEnter(LSMOperationType opType, boolean isMutableComponent) {
         if (state == ComponentState.INACTIVE) {
             throw new IllegalStateException("Trying to enter an inactive disk component");
@@ -110,4 +119,82 @@ public abstract class AbstractLSMDiskComponent extends AbstractLSMComponent impl
         //TODO: do we need to throw an exception when ID is not found?
         return new LSMDiskComponentId(minID, maxID);
     }
+
+    /**
+     * Mark the component as valid
+     *
+     * @param persist
+     *            whether the call should force data to disk before returning
+     * @throws HyracksDataException
+     */
+    @Override
+    public void markAsValid(boolean persist) throws HyracksDataException {
+        ComponentUtils.markAsValid(getMetadataHolder(), persist);
+    }
+
+    @Override
+    public void activate(boolean createNewComponent) throws HyracksDataException {
+        if (createNewComponent) {
+            getIndex().create();
+        }
+        getIndex().activate();
+        if (getLSMComponentFilter() != null && !createNewComponent) {
+            getLsmIndex().getFilterManager().readFilter(getLSMComponentFilter(), getMetadataHolder());
+        }
+    }
+
+    @Override
+    public void deactivateAndDestroy() throws HyracksDataException {
+        getIndex().deactivate();
+        getIndex().destroy();
+    }
+
+    @Override
+    public void destroy() throws HyracksDataException {
+        getIndex().destroy();
+    }
+
+    @Override
+    public void deactivate() throws HyracksDataException {
+        getIndex().deactivate();
+    }
+
+    @Override
+    public void deactivateAndPurge() throws HyracksDataException {
+        getIndex().deactivate();
+        getIndex().purge();
+    }
+
+    @Override
+    public void validate() throws HyracksDataException {
+        getIndex().validate();
+    }
+
+    @Override
+    public IChainedComponentBulkLoader createFilterBulkLoader() throws HyracksDataException {
+        return new FilterBulkLoader(getLSMComponentFilter(), getMetadataHolder(), getLsmIndex().getFilterManager(),
+                getLsmIndex().getTreeFields(), getLsmIndex().getFilterFields(),
+                MultiComparator.create(getLSMComponentFilter().getFilterCmpFactories()));
+    }
+
+    @Override
+    public IChainedComponentBulkLoader createIndexBulkLoader(float fillFactor, boolean verifyInput,
+            long numElementsHint, boolean checkIfEmptyIndex) throws HyracksDataException {
+        return new LSMIndexBulkLoader(
+                getIndex().createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex));
+    }
+
+    @Override
+    public ChainedLSMDiskComponentBulkLoader createBulkLoader(float fillFactor, boolean verifyInput,
+            long numElementsHint, boolean checkIfEmptyIndex, boolean withFilter, boolean cleanupEmptyComponent)
+            throws HyracksDataException {
+        ChainedLSMDiskComponentBulkLoader chainedBulkLoader =
+                new ChainedLSMDiskComponentBulkLoader(this, cleanupEmptyComponent);
+        if (withFilter && getLsmIndex().getFilterFields() != null) {
+            chainedBulkLoader.addBulkLoader(createFilterBulkLoader());
+        }
+        chainedBulkLoader
+                .addBulkLoader(createIndexBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex));
+        return chainedBulkLoader;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77f89525/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentBulkLoader.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentBulkLoader.java
deleted file mode 100644
index 351f619..0000000
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentBulkLoader.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.storage.am.lsm.common.impls;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
-import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
-import org.apache.hyracks.storage.am.common.api.ITreeIndex;
-import org.apache.hyracks.storage.am.common.impls.AbstractTreeIndex.AbstractTreeIndexBulkLoader;
-import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMTreeTupleWriter;
-import org.apache.hyracks.storage.common.IIndex;
-import org.apache.hyracks.storage.common.IIndexBulkLoader;
-import org.apache.hyracks.storage.common.MultiComparator;
-
-public abstract class AbstractLSMDiskComponentBulkLoader implements ILSMDiskComponentBulkLoader {
-    protected final ILSMDiskComponent component;
-
-    protected final IIndexBulkLoader indexBulkLoader;
-    protected final IIndexBulkLoader bloomFilterBuilder;
-
-    protected final ILSMComponentFilterManager filterManager;
-    protected final PermutingTupleReference indexTuple;
-    protected final PermutingTupleReference filterTuple;
-    protected final MultiComparator filterCmp;
-    protected final boolean cleanupEmptyComponent;
-
-    protected boolean cleanedUpArtifacts = false;
-    protected boolean isEmptyComponent = true;
-    protected boolean endedBloomFilterLoad = false;
-
-    //with filter
-    public AbstractLSMDiskComponentBulkLoader(ILSMDiskComponent component, BloomFilterSpecification bloomFilterSpec,
-            float fillFactor, boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex,
-            boolean cleanupEmptyComponent, ILSMComponentFilterManager filterManager, int[] indexFields,
-            int[] filterFields, MultiComparator filterCmp) throws HyracksDataException {
-        this.component = component;
-        this.indexBulkLoader =
-                getIndex(component).createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
-        if (bloomFilterSpec != null) {
-            this.bloomFilterBuilder = getBloomFilter(component).createBuilder(numElementsHint,
-                    bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements());
-        } else {
-            this.bloomFilterBuilder = null;
-        }
-        this.cleanupEmptyComponent = cleanupEmptyComponent;
-        if (filterManager != null) {
-            this.filterManager = filterManager;
-            this.indexTuple = new PermutingTupleReference(indexFields);
-            this.filterTuple = new PermutingTupleReference(filterFields);
-            this.filterCmp = filterCmp;
-        } else {
-            this.filterManager = null;
-            this.indexTuple = null;
-            this.filterTuple = null;
-            this.filterCmp = null;
-        }
-    }
-
-    @Override
-    public void add(ITupleReference tuple) throws HyracksDataException {
-        try {
-            ITupleReference t;
-            if (indexTuple != null) {
-                indexTuple.reset(tuple);
-                t = indexTuple;
-            } else {
-                t = tuple;
-            }
-
-            indexBulkLoader.add(t);
-            if (bloomFilterBuilder != null) {
-                bloomFilterBuilder.add(t);
-            }
-            updateFilter(tuple);
-
-        } catch (Exception e) {
-            cleanupArtifacts();
-            throw e;
-        }
-        if (isEmptyComponent) {
-            isEmptyComponent = false;
-        }
-    }
-
-    @Override
-    public void delete(ITupleReference tuple) throws HyracksDataException {
-        ILSMTreeTupleWriter tupleWriter =
-                (ILSMTreeTupleWriter) ((AbstractTreeIndexBulkLoader) indexBulkLoader).getLeafFrame().getTupleWriter();
-        tupleWriter.setAntimatter(true);
-        try {
-            ITupleReference t;
-            if (indexTuple != null) {
-                indexTuple.reset(tuple);
-                t = indexTuple;
-            } else {
-                t = tuple;
-            }
-
-            indexBulkLoader.add(t);
-
-            updateFilter(tuple);
-        } catch (Exception e) {
-            cleanupArtifacts();
-            throw e;
-        } finally {
-            tupleWriter.setAntimatter(false);
-        }
-        if (isEmptyComponent) {
-            isEmptyComponent = false;
-        }
-    }
-
-    @Override
-    public void abort() throws HyracksDataException {
-        if (indexBulkLoader != null) {
-            indexBulkLoader.abort();
-        }
-        if (bloomFilterBuilder != null) {
-            bloomFilterBuilder.abort();
-        }
-
-    }
-
-    @Override
-    public void end() throws HyracksDataException {
-        if (!cleanedUpArtifacts) {
-            if (bloomFilterBuilder != null && !endedBloomFilterLoad) {
-                bloomFilterBuilder.end();
-                endedBloomFilterLoad = true;
-            }
-
-            //use filter
-            if (filterManager != null && component.getLSMComponentFilter() != null) {
-                filterManager.writeFilter(component.getLSMComponentFilter(), getTreeIndex(component));
-            }
-            indexBulkLoader.end();
-
-            if (isEmptyComponent && cleanupEmptyComponent) {
-                cleanupArtifacts();
-            }
-        }
-    }
-
-    protected void cleanupArtifacts() throws HyracksDataException {
-        if (!cleanedUpArtifacts) {
-            cleanedUpArtifacts = true;
-            if (bloomFilterBuilder != null && !endedBloomFilterLoad) {
-                bloomFilterBuilder.abort();
-                endedBloomFilterLoad = true;
-            }
-            getIndex(component).deactivate();
-            getIndex(component).destroy();
-            if (bloomFilterBuilder != null) {
-                getBloomFilter(component).deactivate();
-                getBloomFilter(component).destroy();
-            }
-        }
-    }
-
-    protected void updateFilter(ITupleReference tuple) throws HyracksDataException {
-        if (filterTuple != null) {
-            filterTuple.reset(tuple);
-            component.getLSMComponentFilter().update(filterTuple, filterCmp);
-        }
-    }
-
-    /**
-     * TreeIndex is used to hold the filter tuple values
-     *
-     * @param component
-     * @return
-     */
-    protected ITreeIndex getTreeIndex(ILSMDiskComponent component) {
-        return (ITreeIndex) getIndex(component);
-    }
-
-    protected abstract IIndex getIndex(ILSMDiskComponent component);
-
-    protected abstract BloomFilter getBloomFilter(ILSMDiskComponent component);
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77f89525/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentWithBuddyBulkLoader.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentWithBuddyBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentWithBuddyBulkLoader.java
deleted file mode 100644
index fc7310c..0000000
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentWithBuddyBulkLoader.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.storage.am.lsm.common.impls;
-
-import org.apache.hyracks.api.exceptions.ErrorCode;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
-import org.apache.hyracks.storage.am.common.api.ITreeIndex;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
-import org.apache.hyracks.storage.common.IIndexBulkLoader;
-import org.apache.hyracks.storage.common.MultiComparator;
-
-public abstract class AbstractLSMDiskComponentWithBuddyBulkLoader extends AbstractLSMDiskComponentBulkLoader {
-
-    protected final IIndexBulkLoader buddyBTreeBulkLoader;
-
-    //with filter
-    public AbstractLSMDiskComponentWithBuddyBulkLoader(ILSMDiskComponent component,
-            BloomFilterSpecification bloomFilterSpec, float fillFactor, boolean verifyInput, long numElementsHint,
-            boolean checkIfEmptyIndex, boolean cleanupEmptyComponent, ILSMComponentFilterManager filterManager,
-            int[] indexFields, int[] filterFields, MultiComparator filterCmp) throws HyracksDataException {
-        super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
-                cleanupEmptyComponent, filterManager, indexFields, filterFields, filterCmp);
-
-        // BuddyBTree must be created even if it could be empty,
-        // since without it the component is not considered as valid.
-        buddyBTreeBulkLoader =
-                getBuddyBTree(component).createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
-    }
-
-    @Override
-    public void add(ITupleReference tuple) throws HyracksDataException {
-        try {
-            ITupleReference t;
-            if (indexTuple != null) {
-                indexTuple.reset(tuple);
-                t = indexTuple;
-            } else {
-                t = tuple;
-            }
-
-            indexBulkLoader.add(t);
-
-            updateFilter(tuple);
-        } catch (Exception e) {
-            cleanupArtifacts();
-            throw e;
-        }
-        if (isEmptyComponent) {
-            isEmptyComponent = false;
-        }
-    }
-
-    @Override
-    public void delete(ITupleReference tuple) throws HyracksDataException {
-        try {
-            ITupleReference t;
-            if (indexTuple != null) {
-                indexTuple.reset(tuple);
-                t = indexTuple;
-            } else {
-                t = tuple;
-            }
-
-            buddyBTreeBulkLoader.add(t);
-            if (bloomFilterBuilder != null) {
-                bloomFilterBuilder.add(t);
-            }
-
-            updateFilter(tuple);
-        } catch (HyracksDataException e) {
-            //deleting a key multiple times is OK
-            if (e.getErrorCode() != ErrorCode.DUPLICATE_KEY) {
-                cleanupArtifacts();
-                throw e;
-            }
-        } catch (Exception e) {
-            cleanupArtifacts();
-            throw e;
-        }
-        if (isEmptyComponent) {
-            isEmptyComponent = false;
-        }
-    }
-
-    @Override
-    public void abort() throws HyracksDataException {
-        super.abort();
-        if (buddyBTreeBulkLoader != null) {
-            buddyBTreeBulkLoader.abort();
-        }
-    }
-
-    @Override
-    public void end() throws HyracksDataException {
-        if (!cleanedUpArtifacts) {
-            if (bloomFilterBuilder != null && !endedBloomFilterLoad) {
-                bloomFilterBuilder.end();
-                endedBloomFilterLoad = true;
-            }
-
-            //use filter
-            if (filterManager != null && component.getLSMComponentFilter() != null) {
-                filterManager.writeFilter(component.getLSMComponentFilter(), getTreeIndex(component));
-            }
-            indexBulkLoader.end();
-            buddyBTreeBulkLoader.end();
-
-            if (isEmptyComponent && cleanupEmptyComponent) {
-                cleanupArtifacts();
-            }
-        }
-    }
-
-    @Override
-    protected void cleanupArtifacts() throws HyracksDataException {
-        if (!cleanedUpArtifacts) {
-            cleanedUpArtifacts = true;
-            if (bloomFilterBuilder != null && !endedBloomFilterLoad) {
-                bloomFilterBuilder.abort();
-                endedBloomFilterLoad = true;
-            }
-            getIndex(component).deactivate();
-            getIndex(component).destroy();
-
-            getBuddyBTree(component).deactivate();
-            getBuddyBTree(component).destroy();
-
-            if (bloomFilterBuilder != null) {
-                getBloomFilter(component).deactivate();
-                getBloomFilter(component).destroy();
-            }
-        }
-    }
-
-    protected abstract ITreeIndex getBuddyBTree(ILSMDiskComponent component);
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77f89525/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index dc64f9b..50c7720 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.replication.IReplicationJob.ReplicationExecutionType;
 import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
@@ -43,6 +44,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFrameFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
@@ -90,11 +92,16 @@ public abstract class AbstractLSMIndex implements ILSMIndex {
     protected final AtomicBoolean[] flushRequests;
     protected boolean memoryComponentsAllocated = false;
     protected ITracer tracer;
+    // Factory for creating on-disk index components during flush and merge.
+    protected final ILSMDiskComponentFactory componentFactory;
+    // Factory for creating on-disk index components during bulkload.
+    protected final ILSMDiskComponentFactory bulkLoadComponentFactory;
 
     public AbstractLSMIndex(IIOManager ioManager, List<IVirtualBufferCache> virtualBufferCaches,
             IBufferCache diskBufferCache, ILSMIndexFileManager fileManager, double bloomFilterFalsePositiveRate,
             ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler,
-            ILSMIOOperationCallback ioOpCallback, ILSMComponentFilterFrameFactory filterFrameFactory,
+            ILSMIOOperationCallback ioOpCallback, ILSMDiskComponentFactory componentFactory,
+            ILSMDiskComponentFactory bulkLoadComponentFactory, ILSMComponentFilterFrameFactory filterFrameFactory,
             LSMComponentFilterManager filterManager, int[] filterFields, boolean durable,
             IComponentFilterHelper filterHelper, int[] treeFields, ITracer tracer) {
         this.ioManager = ioManager;
@@ -104,6 +111,8 @@ public abstract class AbstractLSMIndex implements ILSMIndex {
         this.bloomFilterFalsePositiveRate = bloomFilterFalsePositiveRate;
         this.ioScheduler = ioScheduler;
         this.ioOpCallback = ioOpCallback;
+        this.componentFactory = componentFactory;
+        this.bulkLoadComponentFactory = bulkLoadComponentFactory;
         this.ioOpCallback.setNumOfMutableComponents(virtualBufferCaches.size());
         this.filterHelper = filterHelper;
         this.filterFrameFactory = filterFrameFactory;
@@ -127,13 +136,17 @@ public abstract class AbstractLSMIndex implements ILSMIndex {
     // The constructor used by external indexes
     public AbstractLSMIndex(IIOManager ioManager, IBufferCache diskBufferCache, ILSMIndexFileManager fileManager,
             double bloomFilterFalsePositiveRate, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker,
-            ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallback ioOpCallback, boolean durable) {
+            ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallback ioOpCallback,
+            ILSMDiskComponentFactory componentFactory, ILSMDiskComponentFactory bulkLoadComponentFactory,
+            boolean durable) {
         this.ioManager = ioManager;
         this.diskBufferCache = diskBufferCache;
         this.fileManager = fileManager;
         this.bloomFilterFalsePositiveRate = bloomFilterFalsePositiveRate;
         this.ioScheduler = ioScheduler;
         this.ioOpCallback = ioOpCallback;
+        this.componentFactory = componentFactory;
+        this.bulkLoadComponentFactory = bulkLoadComponentFactory;
         this.durable = durable;
         lsmHarness = new ExternalIndexHarness(this, mergePolicy, opTracker, diskBufferCache.isReplicationEnabled());
         isActive = false;
@@ -169,11 +182,14 @@ public abstract class AbstractLSMIndex implements ILSMIndex {
         isActive = true;
     }
 
-    protected void loadDiskComponents() throws HyracksDataException {
+    private void loadDiskComponents() throws HyracksDataException {
         diskComponents.clear();
         List<LSMComponentFileReferences> validFileReferences = fileManager.cleanupAndGetValidFiles();
-        for (LSMComponentFileReferences lsmComonentFileReference : validFileReferences) {
-            ILSMDiskComponent component = loadComponent(lsmComonentFileReference);
+        for (LSMComponentFileReferences lsmComponentFileReferences : validFileReferences) {
+            ILSMDiskComponent component =
+                    createDiskComponent(componentFactory, lsmComponentFileReferences.getInsertIndexFileReference(),
+                            lsmComponentFileReferences.getDeleteIndexFileReference(),
+                            lsmComponentFileReferences.getBloomFilterFileReference(), false);
             diskComponents.add(component);
         }
     }
@@ -192,7 +208,7 @@ public abstract class AbstractLSMIndex implements ILSMIndex {
             flushMemoryComponent();
         }
         deactivateDiskComponents();
-        deactivateMemoryComponents();
+        deallocateMemoryComponents();
         isActive = false;
     }
 
@@ -208,17 +224,16 @@ public abstract class AbstractLSMIndex implements ILSMIndex {
         }
     }
 
-    protected void deactivateDiskComponents() throws HyracksDataException {
-        List<ILSMDiskComponent> immutableComponents = diskComponents;
-        for (ILSMDiskComponent c : immutableComponents) {
-            deactivateDiskComponent(c);
+    private void deactivateDiskComponents() throws HyracksDataException {
+        for (ILSMDiskComponent c : diskComponents) {
+            c.deactivateAndPurge();
         }
     }
 
-    protected void deactivateMemoryComponents() throws HyracksDataException {
+    private void deallocateMemoryComponents() throws HyracksDataException {
         if (memoryComponentsAllocated) {
             for (ILSMMemoryComponent c : memoryComponents) {
-                deactivateMemoryComponent(c);
+                c.deallocate();
             }
             memoryComponentsAllocated = false;
         }
@@ -233,10 +248,9 @@ public abstract class AbstractLSMIndex implements ILSMIndex {
         fileManager.deleteDirs();
     }
 
-    protected void destroyDiskComponents() throws HyracksDataException {
-        List<ILSMDiskComponent> immutableComponents = diskComponents;
-        for (ILSMDiskComponent c : immutableComponents) {
-            destroyDiskComponent(c);
+    private void destroyDiskComponents() throws HyracksDataException {
+        for (ILSMDiskComponent c : diskComponents) {
+            c.destroy();
         }
     }
 
@@ -245,26 +259,30 @@ public abstract class AbstractLSMIndex implements ILSMIndex {
         if (!isActive) {
             throw HyracksDataException.create(ErrorCode.CANNOT_CLEAR_INACTIVE_INDEX);
         }
-        clearMemoryComponents();
-        clearDiskComponents();
+        resetMemoryComponents();
+        deactivateAndDestroyDiskComponents();
     }
 
-    private void clearDiskComponents() throws HyracksDataException {
+    private void deactivateAndDestroyDiskComponents() throws HyracksDataException {
         for (ILSMDiskComponent c : diskComponents) {
-            clearDiskComponent(c);
+            c.deactivateAndDestroy();
         }
         diskComponents.clear();
     }
 
-    protected void clearMemoryComponents() throws HyracksDataException {
-        if (memoryComponentsAllocated) {
+    private void resetMemoryComponents() throws HyracksDataException {
+        if (memoryComponentsAllocated && memoryComponents != null) {
             for (ILSMMemoryComponent c : memoryComponents) {
-                clearMemoryComponent(c);
+                c.reset();
             }
         }
     }
 
     @Override
+    public void purge() throws HyracksDataException {
+    }
+
+    @Override
     public void getOperationalComponents(ILSMIndexOperationContext ctx) throws HyracksDataException {
         List<ILSMDiskComponent> immutableComponents = diskComponents;
         List<ILSMComponent> operationalComponents = ctx.getComponentHolder();
@@ -376,16 +394,37 @@ public abstract class AbstractLSMIndex implements ILSMIndex {
         return createBulkLoader(fillLevel, verifyInput, numElementsHint);
     }
 
+    public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint)
+            throws HyracksDataException {
+        return new LSMIndexDiskComponentBulkLoader(this, fillLevel, verifyInput, numElementsHint);
+    }
+
+    @Override
+    public ILSMDiskComponent createBulkLoadTarget() throws HyracksDataException {
+        LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference();
+        return createDiskComponent(bulkLoadComponentFactory, componentFileRefs.getInsertIndexFileReference(),
+                componentFileRefs.getDeleteIndexFileReference(), componentFileRefs.getBloomFilterFileReference(), true);
+    }
+
+    protected ILSMDiskComponent createDiskComponent(ILSMDiskComponentFactory factory, FileReference insertFileReference,
+            FileReference deleteIndexFileReference, FileReference bloomFilterFileRef, boolean createComponent)
+            throws HyracksDataException {
+        ILSMDiskComponent component = factory.createComponent(this,
+                new LSMComponentFileReferences(insertFileReference, deleteIndexFileReference, bloomFilterFileRef));
+        component.activate(createComponent);
+        return component;
+    }
+
     @Override
     public final synchronized void allocateMemoryComponents() throws HyracksDataException {
         if (!isActive) {
             throw HyracksDataException.create(ErrorCode.CANNOT_ALLOCATE_MEMORY_FOR_INACTIVE_INDEX);
         }
-        if (memoryComponentsAllocated) {
+        if (memoryComponentsAllocated || memoryComponents == null) {
             return;
         }
         for (ILSMMemoryComponent c : memoryComponents) {
-            allocateMemoryComponent(c);
+            c.allocate();
         }
         memoryComponentsAllocated = true;
     }
@@ -410,7 +449,7 @@ public abstract class AbstractLSMIndex implements ILSMIndex {
     @Override
     public void changeMutableComponent() {
         currentMutableComponentId.set((currentMutableComponentId.get() + 1) % memoryComponents.size());
-        memoryComponents.get(currentMutableComponentId.get()).activate();
+        memoryComponents.get(currentMutableComponentId.get()).requestActivation();
     }
 
     @Override
@@ -507,8 +546,8 @@ public abstract class AbstractLSMIndex implements ILSMIndex {
         Set<String> componentFiles = new HashSet<>();
 
         //get set of files to be replicated for each component
-        for (ILSMComponent lsmComponent : lsmComponents) {
-            componentFiles.addAll(getLSMComponentPhysicalFiles(lsmComponent));
+        for (ILSMDiskComponent lsmComponent : lsmComponents) {
+            componentFiles.addAll(lsmComponent.getLSMComponentPhysicalFiles());
         }
 
         ReplicationExecutionType executionType;
@@ -595,11 +634,11 @@ public abstract class AbstractLSMIndex implements ILSMIndex {
     public final void validate() throws HyracksDataException {
         if (memoryComponentsAllocated) {
             for (ILSMMemoryComponent c : memoryComponents) {
-                validateMemoryComponent(c);
+                c.validate();
             }
         }
         for (ILSMDiskComponent c : diskComponents) {
-            validateDiskComponent(c);
+            c.validate();
         }
     }
 
@@ -607,7 +646,7 @@ public abstract class AbstractLSMIndex implements ILSMIndex {
     public long getMemoryAllocationSize() {
         long size = 0;
         for (ILSMMemoryComponent c : memoryComponents) {
-            size += getMemoryComponentSize(c);
+            size += c.getSize();
         }
         return size;
     }
@@ -628,31 +667,6 @@ public abstract class AbstractLSMIndex implements ILSMIndex {
                 : doMerge(operation);
     }
 
-    public abstract Set<String> getLSMComponentPhysicalFiles(ILSMComponent newComponent);
-
-    protected abstract void allocateMemoryComponent(ILSMMemoryComponent c) throws HyracksDataException;
-
-    protected abstract IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint)
-            throws HyracksDataException;
-
-    protected abstract ILSMDiskComponent loadComponent(LSMComponentFileReferences refs) throws HyracksDataException;
-
-    protected abstract void deactivateMemoryComponent(ILSMMemoryComponent c) throws HyracksDataException;
-
-    protected abstract void deactivateDiskComponent(ILSMDiskComponent c) throws HyracksDataException;
-
-    protected abstract void destroyDiskComponent(ILSMDiskComponent c) throws HyracksDataException;
-
-    protected abstract void clearDiskComponent(ILSMDiskComponent c) throws HyracksDataException;
-
-    protected abstract void clearMemoryComponent(ILSMMemoryComponent c) throws HyracksDataException;
-
-    protected abstract void validateMemoryComponent(ILSMMemoryComponent c) throws HyracksDataException;
-
-    protected abstract void validateDiskComponent(ILSMDiskComponent c) throws HyracksDataException;
-
-    protected abstract long getMemoryComponentSize(ILSMMemoryComponent c);
-
     protected abstract LSMComponentFileReferences getMergeFileReferences(ILSMDiskComponent firstComponent,
             ILSMDiskComponent lastComponent) throws HyracksDataException;
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77f89525/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
index 1ee68d9..a4fe35c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
@@ -25,6 +25,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 
 public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent implements ILSMMemoryComponent {
 
@@ -178,7 +179,7 @@ public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent im
     }
 
     @Override
-    public void activate() {
+    public void requestActivation() {
         requestedToBeActive = true;
     }
 
@@ -198,12 +199,20 @@ public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent im
     }
 
     @Override
-    public void reset() throws HyracksDataException {
+    public final void reset() throws HyracksDataException {
         isModified.set(false);
         metadata.reset();
         if (filter != null) {
             filter.reset();
         }
+        doReset();
+    }
+
+    protected void doReset() throws HyracksDataException {
+        getIndex().deactivate();
+        getIndex().destroy();
+        getIndex().create();
+        getIndex().activate();
     }
 
     @Override
@@ -215,4 +224,37 @@ public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent im
     public MemoryComponentMetadata getMetadata() {
         return metadata;
     }
+
+    @Override
+    public final void allocate() throws HyracksDataException {
+        ((IVirtualBufferCache) getIndex().getBufferCache()).open();
+        doAllocate();
+    }
+
+    protected void doAllocate() throws HyracksDataException {
+        getIndex().create();
+        getIndex().activate();
+    }
+
+    @Override
+    public final void deallocate() throws HyracksDataException {
+        doDeallocate();
+        getIndex().getBufferCache().close();
+    }
+
+    protected void doDeallocate() throws HyracksDataException {
+        getIndex().deactivate();
+        getIndex().destroy();
+    }
+
+    @Override
+    public void validate() throws HyracksDataException {
+        getIndex().validate();
+    }
+
+    @Override
+    public long getSize() {
+        IBufferCache virtualBufferCache = getIndex().getBufferCache();
+        return virtualBufferCache.getNumPages() * (long) virtualBufferCache.getPageSize();
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77f89525/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BloomFilterBulkLoader.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BloomFilterBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BloomFilterBulkLoader.java
new file mode 100644
index 0000000..0dcf349
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BloomFilterBulkLoader.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.common.IIndexBulkLoader;
+
+public class BloomFilterBulkLoader implements IChainedComponentBulkLoader {
+
+    private final IIndexBulkLoader bulkLoader;
+
+    public BloomFilterBulkLoader(IIndexBulkLoader bulkLoader) {
+        this.bulkLoader = bulkLoader;
+    }
+
+    private boolean endedBloomFilterLoad = false;
+
+    @Override
+    public ITupleReference add(ITupleReference tuple) throws HyracksDataException {
+        bulkLoader.add(tuple);
+        return tuple;
+    }
+
+    @Override
+    public ITupleReference delete(ITupleReference tuple) throws HyracksDataException {
+        //Noop
+        return tuple;
+    }
+
+    @Override
+    public void end() throws HyracksDataException {
+        if (!endedBloomFilterLoad) {
+            bulkLoader.end();
+            endedBloomFilterLoad = true;
+        }
+    }
+
+    @Override
+    public void abort() throws HyracksDataException {
+        bulkLoader.abort();
+    }
+
+    @Override
+    public void cleanupArtifacts() throws HyracksDataException {
+        if (!endedBloomFilterLoad) {
+            bulkLoader.abort();
+            endedBloomFilterLoad = true;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77f89525/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java
new file mode 100644
index 0000000..f38614c
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
+
+/**
+ * Class encapsulates a chain of operations, happening during an LSM disk component bulkload
+ */
+public class ChainedLSMDiskComponentBulkLoader implements ILSMDiskComponentBulkLoader {
+
+    private List<IChainedComponentBulkLoader> bulkloaderChain = new LinkedList<>();
+    private boolean isEmptyComponent = true;
+    private boolean cleanedUpArtifacts = false;
+    private final ILSMDiskComponent diskComponent;
+    private final boolean cleanupEmptyComponent;
+
+    public ChainedLSMDiskComponentBulkLoader(ILSMDiskComponent diskComponent, boolean cleanupEmptyComponent) {
+        this.diskComponent = diskComponent;
+        this.cleanupEmptyComponent = cleanupEmptyComponent;
+    }
+
+    public void addBulkLoader(IChainedComponentBulkLoader bulkloader) {
+        bulkloaderChain.add(bulkloader);
+    }
+
+    @Override
+    public void add(ITupleReference tuple) throws HyracksDataException {
+        try {
+            ITupleReference t = tuple;
+            for (IChainedComponentBulkLoader lsmBulkloader : bulkloaderChain) {
+                t = lsmBulkloader.add(t);
+            }
+        } catch (Exception e) {
+            cleanupArtifacts();
+            throw e;
+        }
+        if (isEmptyComponent) {
+            isEmptyComponent = false;
+        }
+    }
+
+    @Override
+    public void delete(ITupleReference tuple) throws HyracksDataException {
+        try {
+            ITupleReference t = tuple;
+            for (IChainedComponentBulkLoader lsmOperation : bulkloaderChain) {
+                t = lsmOperation.delete(t);
+            }
+        } catch (Exception e) {
+            cleanupArtifacts();
+            throw e;
+        }
+        if (isEmptyComponent) {
+            isEmptyComponent = false;
+        }
+    }
+
+    @Override
+    public void cleanupArtifacts() throws HyracksDataException {
+        if (!cleanedUpArtifacts) {
+            cleanedUpArtifacts = true;
+            for (IChainedComponentBulkLoader lsmOperation : bulkloaderChain) {
+                lsmOperation.cleanupArtifacts();
+            }
+        }
+        diskComponent.deactivateAndDestroy();
+    }
+
+    @Override
+    public void end() throws HyracksDataException {
+        if (!cleanedUpArtifacts) {
+            for (IChainedComponentBulkLoader lsmOperation : bulkloaderChain) {
+                lsmOperation.end();
+            }
+            if (isEmptyComponent && cleanupEmptyComponent) {
+                cleanupArtifacts();
+            }
+        }
+    }
+
+    @Override
+    public void abort() throws HyracksDataException {
+        for (IChainedComponentBulkLoader lsmOperation : bulkloaderChain) {
+            lsmOperation.abort();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77f89525/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
index 0134dca..f2751bf 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
@@ -18,12 +18,17 @@
  */
 package org.apache.hyracks.storage.am.lsm.common.impls;
 
+import java.util.Collections;
+import java.util.Set;
+
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.ITreeIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.common.IIndex;
 
 public class EmptyComponent implements ILSMDiskComponent {
     public static final EmptyComponent INSTANCE = new EmptyComponent();
@@ -53,6 +58,11 @@ public class EmptyComponent implements ILSMDiskComponent {
     }
 
     @Override
+    public IIndex getIndex() {
+        return null;
+    }
+
+    @Override
     public DiskComponentMetadata getMetadata() {
         return EmptyDiskComponentMetadata.INSTANCE;
     }
@@ -78,8 +88,65 @@ public class EmptyComponent implements ILSMDiskComponent {
     }
 
     @Override
+    public AbstractLSMIndex getLsmIndex() {
+        return null;
+    }
+
+    @Override
+    public ITreeIndex getMetadataHolder() {
+        return null;
+    }
+
+    @Override
+    public Set<String> getLSMComponentPhysicalFiles() {
+        return Collections.emptySet();
+    }
+
+    @Override
     public void markAsValid(boolean persist) throws HyracksDataException {
         // No Op
     }
 
+    @Override
+    public void activate(boolean createNewComponent) throws HyracksDataException {
+        // No Op
+    }
+
+    @Override
+    public void deactivateAndDestroy() throws HyracksDataException {
+        // No Op
+    }
+
+    @Override
+    public void deactivate() throws HyracksDataException {
+        // No Op
+    }
+
+    @Override
+    public void deactivateAndPurge() throws HyracksDataException {
+        // No Op
+    }
+
+    @Override
+    public void validate() throws HyracksDataException {
+        // No Op
+    }
+
+    @Override
+    public IChainedComponentBulkLoader createFilterBulkLoader() throws HyracksDataException {
+        return null;
+    }
+
+    @Override
+    public IChainedComponentBulkLoader createIndexBulkLoader(float fillFactor, boolean verifyInput,
+            long numElementsHint, boolean checkIfEmptyIndex) throws HyracksDataException {
+        return null;
+    }
+
+    @Override
+    public ChainedLSMDiskComponentBulkLoader createBulkLoader(float fillFactor, boolean verifyInput,
+            long numElementsHint, boolean checkIfEmptyIndex, boolean withFilter, boolean cleanupEmptyComponent)
+            throws HyracksDataException {
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77f89525/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java
index 2f65b18..11f2441 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java
@@ -135,7 +135,7 @@ public class ExternalIndexHarness extends LSMHarness {
                                 lsmIndex.scheduleReplication(null, componentsToBeReplicated, false,
                                         ReplicationOperation.DELETE, opType);
                             }
-                            ((ILSMDiskComponent) c).destroy();
+                            ((ILSMDiskComponent) c).deactivateAndDestroy();
                             break;
                         default:
                             break;
@@ -348,7 +348,7 @@ public class ExternalIndexHarness extends LSMHarness {
                 componentsToBeReplicated.add(diskComponent);
                 lsmIndex.scheduleReplication(null, componentsToBeReplicated, false, ReplicationOperation.DELETE, null);
             }
-            diskComponent.destroy();
+            diskComponent.deactivateAndDestroy();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77f89525/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FilterBulkLoader.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FilterBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FilterBulkLoader.java
new file mode 100644
index 0000000..7359d2b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FilterBulkLoader.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager;
+import org.apache.hyracks.storage.common.MultiComparator;
+
+public class FilterBulkLoader implements IChainedComponentBulkLoader {
+
+    private final ILSMComponentFilter filter;
+    private final ITreeIndex treeIndex;
+    protected final ILSMComponentFilterManager filterManager;
+    protected final PermutingTupleReference indexTuple;
+    protected final PermutingTupleReference filterTuple;
+    protected final MultiComparator filterCmp;
+
+    public FilterBulkLoader(ILSMComponentFilter filter, ITreeIndex treeIndex,
+            ILSMComponentFilterManager filterManager, int[] indexFields, int[] filterFields,
+            MultiComparator filterCmp) {
+        this.filter = filter;
+        this.treeIndex = treeIndex;
+        this.filterManager = filterManager;
+        this.indexTuple = new PermutingTupleReference(indexFields);
+        this.filterTuple = new PermutingTupleReference(filterFields);
+        this.filterCmp = filterCmp;
+    }
+
+    @Override
+    public ITupleReference delete(ITupleReference tuple) throws HyracksDataException {
+        indexTuple.reset(tuple);
+        updateFilter(tuple);
+        return indexTuple;
+    }
+
+    @Override
+    public void cleanupArtifacts() throws HyracksDataException {
+        //Noop
+    }
+
+    @Override
+    public ITupleReference add(ITupleReference tuple) throws HyracksDataException {
+        indexTuple.reset(tuple);
+        updateFilter(tuple);
+        return indexTuple;
+    }
+
+    @Override
+    public void end() throws HyracksDataException {
+        filterManager.writeFilter(filter, treeIndex);
+    }
+
+    @Override
+    public void abort() throws HyracksDataException {
+        //Noop
+    }
+
+    private void updateFilter(ITupleReference tuple) throws HyracksDataException {
+        filterTuple.reset(tuple);
+        filter.update(filterTuple, filterCmp);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77f89525/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IChainedComponentBulkLoader.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IChainedComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IChainedComponentBulkLoader.java
new file mode 100644
index 0000000..90ef127
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IChainedComponentBulkLoader.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public interface IChainedComponentBulkLoader {
+    /**
+     * Adds a tuple to the bulkloaded component
+     *
+     * @param tuple
+     * @return Potentially modified tuple, which is used as an input for downstream bulkloaders
+     * @throws HyracksDataException
+     */
+    ITupleReference add(ITupleReference tuple) throws HyracksDataException;
+
+    /**
+     * Deletes a tuple (i.e. appends anti-matter tuple or deleted-key tuple) from the bulkloaded component
+     *
+     * @param tuple
+     * @return Potentially modified tuple, which is used as an input for downstream bulkloaders
+     * @throws HyracksDataException
+     */
+    ITupleReference delete(ITupleReference tuple) throws HyracksDataException;
+
+    /**
+     * Correctly finalizes bulkloading process and releases all resources
+     *
+     * @throws HyracksDataException
+     */
+    void end() throws HyracksDataException;
+
+    /**
+     * Aborts bulkloading process without releasing associated resources
+     *
+     * @throws HyracksDataException
+     */
+    void abort() throws HyracksDataException;
+
+    /**
+     * Releases all resources allocated during the bulkloading process
+     *
+     * @throws HyracksDataException
+     */
+    void cleanupArtifacts() throws HyracksDataException;
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77f89525/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IndexWithBuddyBulkLoader.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IndexWithBuddyBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IndexWithBuddyBulkLoader.java
new file mode 100644
index 0000000..4fb2919
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IndexWithBuddyBulkLoader.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.common.IIndexBulkLoader;
+
+public class IndexWithBuddyBulkLoader implements IChainedComponentBulkLoader {
+
+    private final IIndexBulkLoader bulkLoader;
+    private final IIndexBulkLoader buddyBTreeBulkLoader;
+
+    public IndexWithBuddyBulkLoader(IIndexBulkLoader bulkLoader, IIndexBulkLoader buddyBTreeBulkLoader) {
+        this.bulkLoader = bulkLoader;
+        this.buddyBTreeBulkLoader = buddyBTreeBulkLoader;
+    }
+
+    @Override
+    public ITupleReference delete(ITupleReference tuple) throws HyracksDataException {
+        try {
+            buddyBTreeBulkLoader.add(tuple);
+        } catch (HyracksDataException e) {
+            //deleting a key multiple times is OK
+            if (e.getErrorCode() != ErrorCode.DUPLICATE_KEY) {
+                cleanupArtifacts();
+                throw e;
+            }
+        }
+        return tuple;
+    }
+
+    @Override
+    public void cleanupArtifacts() throws HyracksDataException {
+        //Noop
+    }
+
+    @Override
+    public ITupleReference add(ITupleReference tuple) throws HyracksDataException {
+        bulkLoader.add(tuple);
+        return tuple;
+    }
+
+    @Override
+    public void end() throws HyracksDataException {
+        bulkLoader.end();
+        buddyBTreeBulkLoader.end();
+    }
+
+    @Override
+    public void abort() throws HyracksDataException {
+        bulkLoader.abort();
+        buddyBTreeBulkLoader.abort();
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77f89525/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 1bdd250..c8be9b9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -356,8 +356,8 @@ public class LSMHarness implements ILSMHarness {
                         lsmIndex.scheduleReplication(null, inactiveDiskComponentsToBeDeleted, false,
                                 ReplicationOperation.DELETE, opType);
                     }
-                    for (ILSMComponent c : inactiveDiskComponentsToBeDeleted) {
-                        ((AbstractLSMDiskComponent) c).destroy();
+                    for (ILSMDiskComponent c : inactiveDiskComponentsToBeDeleted) {
+                        c.deactivateAndDestroy();
                     }
                 } catch (Throwable e) {
                     if (LOGGER.isLoggable(Level.WARNING)) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77f89525/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexBulkLoader.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexBulkLoader.java
new file mode 100644
index 0000000..84857f4
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexBulkLoader.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.impls.AbstractTreeIndex.AbstractTreeIndexBulkLoader;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMTreeTupleWriter;
+import org.apache.hyracks.storage.common.IIndexBulkLoader;
+
+public class LSMIndexBulkLoader implements IChainedComponentBulkLoader {
+    private final IIndexBulkLoader bulkLoader;
+
+    public LSMIndexBulkLoader(IIndexBulkLoader bulkLoader) {
+        this.bulkLoader = bulkLoader;
+    }
+
+    @Override
+    public ITupleReference delete(ITupleReference tuple) throws HyracksDataException {
+        ILSMTreeTupleWriter tupleWriter =
+                (ILSMTreeTupleWriter) ((AbstractTreeIndexBulkLoader) bulkLoader).getLeafFrame().getTupleWriter();
+        tupleWriter.setAntimatter(true);
+        try {
+            bulkLoader.add(tuple);
+        } finally {
+            tupleWriter.setAntimatter(false);
+        }
+        return tuple;
+    }
+
+    @Override
+    public void cleanupArtifacts() throws HyracksDataException {
+        //Noop
+    }
+
+    @Override
+    public ITupleReference add(ITupleReference tuple) throws HyracksDataException {
+        bulkLoader.add(tuple);
+        return tuple;
+    }
+
+    @Override
+    public void end() throws HyracksDataException {
+        bulkLoader.end();
+    }
+
+    @Override
+    public void abort() throws HyracksDataException {
+        bulkLoader.abort();
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77f89525/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
new file mode 100644
index 0000000..8befee1
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.common.IIndexBulkLoader;
+
+public class LSMIndexDiskComponentBulkLoader implements IIndexBulkLoader {
+    private final AbstractLSMIndex lsmIndex;
+    private final ILSMDiskComponent component;
+    private final IIndexBulkLoader componentBulkLoader;
+
+    public LSMIndexDiskComponentBulkLoader(AbstractLSMIndex lsmIndex, float fillFactor, boolean verifyInput,
+            long numElementsHint) throws HyracksDataException {
+        this.lsmIndex = lsmIndex;
+        // Note that by using a flush target file name, we state that the
+        // new bulk loaded component is "newer" than any other merged component.
+        this.component = lsmIndex.createBulkLoadTarget();
+        this.componentBulkLoader =
+                component.createBulkLoader(fillFactor, verifyInput, numElementsHint, false, true, true);
+    }
+
+    @Override
+    public void add(ITupleReference tuple) throws HyracksDataException {
+        componentBulkLoader.add(tuple);
+    }
+
+    @Override
+    public void end() throws HyracksDataException {
+        componentBulkLoader.end();
+        if (component.getComponentSize() > 0) {
+            //TODO(amoudi): Ensure Bulk load follow the same lifecycle Other Operations (Flush, Merge, etc).
+            //then after operation should be called from harness as well
+            //https://issues.apache.org/jira/browse/ASTERIXDB-1764
+            lsmIndex.getIOOperationCallback().afterOperation(LSMOperationType.FLUSH, null, component);
+            lsmIndex.getLsmHarness().addBulkLoadedComponent(component);
+        }
+    }
+
+    @Override
+    public void abort() throws HyracksDataException {
+        componentBulkLoader.abort();
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77f89525/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/api/IInPlaceInvertedIndex.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/api/IInPlaceInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/api/IInPlaceInvertedIndex.java
index a26e14c..6f583b4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/api/IInPlaceInvertedIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/api/IInPlaceInvertedIndex.java
@@ -41,13 +41,4 @@ public interface IInPlaceInvertedIndex extends IInvertedIndex {
      */
     void openInvertedListCursor(IInvertedListCursor listCursor, ITupleReference searchKey, IIndexOperationContext ictx)
             throws HyracksDataException;
-
-    /**
-     * Purge the index files out of the buffer cache.
-     * Can only be called if the caller is absolutely sure the files don't contain dirty pages
-     *
-     * @throws HyracksDataException
-     *             if the index is active
-     */
-    void purge() throws HyracksDataException;
 }


Mime
View raw message