asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [1/2] asterixdb git commit: Add LSMDiskComponentBulkLoader
Date Wed, 31 May 2017 18:47:56 GMT
Repository: asterixdb
Updated Branches:
  refs/heads/master 745379ee1 -> 1ded1616b


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1ded1616/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index 13ff420..3618737 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -71,6 +71,7 @@ import org.apache.hyracks.storage.common.IIndexCursor;
 import org.apache.hyracks.storage.common.IModificationOperationCallback;
 import org.apache.hyracks.storage.common.ISearchOperationCallback;
 import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.MultiComparator;
 import org.apache.hyracks.storage.common.file.IFileMapProvider;
 
 public class LSMRTree extends AbstractLSMRTree {
@@ -173,20 +174,39 @@ public class LSMRTree extends AbstractLSMRTree {
         RTreeSearchCursor rtreeScanCursor = (RTreeSearchCursor) memRTreeAccessor.createSearchCursor(false);
         SearchPredicate rtreeNullPredicate = new SearchPredicate(null, null);
         memRTreeAccessor.search(rtreeScanCursor, rtreeNullPredicate);
+
         LSMRTreeDiskComponent component = createDiskComponent(componentFactory, flushOp.getTarget(),
                 flushOp.getBTreeTarget(), flushOp.getBloomFilterTarget(), true);
-        RTree diskRTree = component.getRTree();
-        IIndexBulkLoader rTreeBulkloader;
-        ITreeIndexCursor cursor;
 
+        //count the number of tuples in the buddy btree
+        ITreeIndexAccessor memBTreeAccessor = flushingComponent.getBTree()
+                .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+        RangePredicate btreeNullPredicate = new RangePredicate(null, null, true, true, null,
null);
+        IIndexCursor btreeCountingCursor = ((BTreeAccessor) memBTreeAccessor).createCountingSearchCursor();
+        memBTreeAccessor.search(btreeCountingCursor, btreeNullPredicate);
+        long numBTreeTuples = 0L;
+        try {
+            while (btreeCountingCursor.hasNext()) {
+                btreeCountingCursor.next();
+                ITupleReference countTuple = btreeCountingCursor.getTuple();
+                numBTreeTuples = IntegerPointable.getInteger(countTuple.getFieldData(0),
countTuple.getFieldStart(0));
+            }
+        } finally {
+            btreeCountingCursor.close();
+        }
+
+        IIndexBulkLoader componentBulkLoader =
+                createComponentBulkLoader(component, 1.0f, false, numBTreeTuples, false,
false);
+
+        ITreeIndexCursor cursor;
         IBinaryComparatorFactory[] linearizerArray = { linearizer };
 
         TreeTupleSorter rTreeTupleSorter = new TreeTupleSorter(flushingComponent.getRTree().getFileId(),
                 linearizerArray, rtreeLeafFrameFactory.createFrame(), rtreeLeafFrameFactory.createFrame(),
                 flushingComponent.getRTree().getBufferCache(), comparatorFields);
+
         // BulkLoad the tuples from the in-memory tree into the new disk
         // RTree.
-
         boolean isEmpty = true;
         try {
             while (rtreeScanCursor.hasNext()) {
@@ -199,7 +219,6 @@ public class LSMRTree extends AbstractLSMRTree {
         }
         rTreeTupleSorter.sort();
 
-        rTreeBulkloader = diskRTree.createBulkLoader(1.0f, false, 0L, false);
         cursor = rTreeTupleSorter;
 
         if (!isEmpty) {
@@ -207,54 +226,24 @@ public class LSMRTree extends AbstractLSMRTree {
                 while (cursor.hasNext()) {
                     cursor.next();
                     ITupleReference frameTuple = cursor.getTuple();
-                    rTreeBulkloader.add(frameTuple);
+                    componentBulkLoader.add(frameTuple);
                 }
             } finally {
                 cursor.close();
             }
         }
 
-        rTreeBulkloader.end();
-
-        ITreeIndexAccessor memBTreeAccessor = flushingComponent.getBTree()
-                .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
-        RangePredicate btreeNullPredicate = new RangePredicate(null, null, true, true, null,
null);
-        IIndexCursor btreeCountingCursor = ((BTreeAccessor) memBTreeAccessor).createCountingSearchCursor();
-        memBTreeAccessor.search(btreeCountingCursor, btreeNullPredicate);
-        long numBTreeTuples = 0L;
-        try {
-            while (btreeCountingCursor.hasNext()) {
-                btreeCountingCursor.next();
-                ITupleReference countTuple = btreeCountingCursor.getTuple();
-                numBTreeTuples = IntegerPointable.getInteger(countTuple.getFieldData(0),
countTuple.getFieldStart(0));
-            }
-        } finally {
-            btreeCountingCursor.close();
-        }
-
-        int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numBTreeTuples);
-        BloomFilterSpecification bloomFilterSpec =
-                BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
-
+        // scan the memory BTree
         IIndexCursor btreeScanCursor = memBTreeAccessor.createSearchCursor(false);
         memBTreeAccessor.search(btreeScanCursor, btreeNullPredicate);
-        BTree diskBTree = component.getBTree();
-
-        // BulkLoad the tuples from the in-memory tree into the new disk BTree.
-        IIndexBulkLoader bTreeBulkloader = diskBTree.createBulkLoader(1.0f, false, numBTreeTuples,
false);
-        IIndexBulkLoader builder = component.getBloomFilter().createBuilder(numBTreeTuples,
-                bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements());
-        // scan the memory BTree
         try {
             while (btreeScanCursor.hasNext()) {
                 btreeScanCursor.next();
                 ITupleReference frameTuple = btreeScanCursor.getTuple();
-                bTreeBulkloader.add(frameTuple);
-                builder.add(frameTuple);
+                ((LSMRTreeDiskComponentBulkLoader) componentBulkLoader).delete(frameTuple);
             }
         } finally {
             btreeScanCursor.close();
-            builder.end();
         }
 
         if (component.getLSMComponentFilter() != null) {
@@ -266,7 +255,8 @@ public class LSMRTree extends AbstractLSMRTree {
         }
         // Note. If we change the filter to write to metadata object, we don't need the if
block above
         flushingComponent.getMetadata().copy(component.getMetadata());
-        bTreeBulkloader.end();
+
+        componentBulkLoader.end();
         return component;
     }
 
@@ -282,40 +272,46 @@ public class LSMRTree extends AbstractLSMRTree {
         LSMRTreeDiskComponent mergedComponent = createDiskComponent(componentFactory, mergeOp.getTarget(),
                 mergeOp.getBTreeTarget(), mergeOp.getBloomFilterTarget(), true);
 
+        IIndexBulkLoader componentBulkLoader;
+
         // In case we must keep the deleted-keys BTrees, then they must be merged *before*
merging the r-trees so that
         // lsmHarness.endSearch() is called once when the r-trees have been merged.
-        BTree btree = mergedComponent.getBTree();
-        IIndexBulkLoader btreeBulkLoader = btree.createBulkLoader(1.0f, true, 0L, false);
         if (mergeOp.getMergingComponents().get(mergeOp.getMergingComponents().size() - 1)
!= diskComponents
                 .get(diskComponents.size() - 1)) {
             // Keep the deleted tuples since the oldest disk component is not included in
the merge operation
 
-            LSMRTreeDeletedKeysBTreeMergeCursor btreeCursor = new LSMRTreeDeletedKeysBTreeMergeCursor(opCtx);
-            search(opCtx, btreeCursor, rtreeSearchPred);
-
             long numElements = 0L;
             for (int i = 0; i < mergeOp.getMergingComponents().size(); ++i) {
                 numElements += ((LSMRTreeDiskComponent) mergeOp.getMergingComponents().get(i)).getBloomFilter()
                         .getNumElements();
             }
+            componentBulkLoader = createComponentBulkLoader(mergedComponent, 1.0f, false,
numElements, false, false);
 
-            int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElements);
-            BloomFilterSpecification bloomFilterSpec =
-                    BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
-            IIndexBulkLoader builder = mergedComponent.getBloomFilter().createBuilder(numElements,
-                    bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements());
-
+            LSMRTreeDeletedKeysBTreeMergeCursor btreeCursor = new LSMRTreeDeletedKeysBTreeMergeCursor(opCtx);
+            search(opCtx, btreeCursor, rtreeSearchPred);
             try {
                 while (btreeCursor.hasNext()) {
                     btreeCursor.next();
                     ITupleReference tuple = btreeCursor.getTuple();
-                    btreeBulkLoader.add(tuple);
-                    builder.add(tuple);
+                    ((LSMRTreeDiskComponentBulkLoader) componentBulkLoader).delete(tuple);
                 }
             } finally {
                 btreeCursor.close();
-                builder.end();
             }
+        } else {
+            //no buddy-btree needed
+            componentBulkLoader = createComponentBulkLoader(mergedComponent, 1.0f, false,
0L, false, false);
+        }
+
+        //search old rtree components
+        try {
+            while (cursor.hasNext()) {
+                cursor.next();
+                ITupleReference frameTuple = cursor.getTuple();
+                componentBulkLoader.add(frameTuple);
+            }
+        } finally {
+            cursor.close();
         }
 
         if (mergedComponent.getLSMComponentFilter() != null) {
@@ -327,19 +323,8 @@ public class LSMRTree extends AbstractLSMRTree {
             getFilterManager().updateFilter(mergedComponent.getLSMComponentFilter(), filterTuples);
             getFilterManager().writeFilter(mergedComponent.getLSMComponentFilter(), mergedComponent.getRTree());
         }
-        btreeBulkLoader.end();
 
-        IIndexBulkLoader bulkLoader = mergedComponent.getRTree().createBulkLoader(1.0f, false,
0L, false);
-        try {
-            while (cursor.hasNext()) {
-                cursor.next();
-                ITupleReference frameTuple = cursor.getTuple();
-                bulkLoader.add(frameTuple);
-            }
-        } finally {
-            cursor.close();
-        }
-        bulkLoader.end();
+        componentBulkLoader.end();
 
         return mergedComponent;
     }
@@ -358,6 +343,25 @@ public class LSMRTree extends AbstractLSMRTree {
     }
 
     @Override
+    public IIndexBulkLoader createComponentBulkLoader(ILSMDiskComponent component, float
fillFactor,
+            boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex, boolean
withFilter)
+            throws HyracksDataException {
+        BloomFilterSpecification bloomFilterSpec = null;
+        if (numElementsHint > 0) {
+            int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElementsHint);
+            bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
+        }
+        if (withFilter && filterFields != null) {
+            return new LSMRTreeDiskComponentBulkLoader((LSMRTreeDiskComponent) component,
bloomFilterSpec, fillFactor,
+                    verifyInput, numElementsHint, checkIfEmptyIndex, filterManager, treeFields,
filterFields,
+                    MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories()));
+        } else {
+            return new LSMRTreeDiskComponentBulkLoader((LSMRTreeDiskComponent) component,
bloomFilterSpec, fillFactor,
+                    verifyInput, numElementsHint, checkIfEmptyIndex);
+        }
+    }
+
+    @Override
     public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint)
             throws HyracksDataException {
         return new LSMRTreeBulkLoader(this, fillLevel, verifyInput, numElementsHint);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1ded1616/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeBulkLoader.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeBulkLoader.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeBulkLoader.java
index edc3e7d..fbdd37a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeBulkLoader.java
@@ -20,110 +20,41 @@ package org.apache.hyracks.storage.am.lsm.rtree.impls;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 import org.apache.hyracks.storage.common.IIndexBulkLoader;
-import org.apache.hyracks.storage.common.MultiComparator;
 
 public class LSMRTreeBulkLoader implements IIndexBulkLoader {
     private final ILSMDiskComponent component;
-    private final IIndexBulkLoader bulkLoader;
-    private final IIndexBulkLoader buddyBTreeBulkloader;
-    private boolean cleanedUpArtifacts = false;
-    private boolean isEmptyComponent = true;
-    public final PermutingTupleReference indexTuple;
-    public final PermutingTupleReference filterTuple;
-    public final MultiComparator filterCmp;
     private final LSMRTree lsmIndex;
+    private final IIndexBulkLoader componentBulkLoader;
 
     public LSMRTreeBulkLoader(LSMRTree lsmIndex, float fillFactor, boolean verifyInput, long
numElementsHint)
             throws HyracksDataException {
         this.lsmIndex = lsmIndex;
         // Note that by using a flush target file name, we state that the
         // new bulk loaded tree is "newer" than any other merged tree.
-        component = lsmIndex.createBulkLoadTarget();
-        bulkLoader = ((LSMRTreeDiskComponent) component).getRTree().createBulkLoader(fillFactor,
verifyInput,
-                numElementsHint, false);
-        buddyBTreeBulkloader = ((LSMRTreeDiskComponent) component).getBTree().createBulkLoader(fillFactor,
verifyInput,
-                numElementsHint, false);
-        if (lsmIndex.getFilterFields() != null) {
-            indexTuple = new PermutingTupleReference(lsmIndex.getTreeFields());
-            filterCmp = MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories());
-            filterTuple = new PermutingTupleReference(lsmIndex.getFilterFields());
-        } else {
-            indexTuple = null;
-            filterCmp = null;
-            filterTuple = null;
-        }
+        this.component = lsmIndex.createBulkLoadTarget();
+        this.componentBulkLoader =
+                lsmIndex.createComponentBulkLoader(component, fillFactor, verifyInput, numElementsHint,
false, true);
     }
 
     @Override
     public void add(ITupleReference tuple) throws HyracksDataException {
-        try {
-            ITupleReference t;
-            if (indexTuple != null) {
-                indexTuple.reset(tuple);
-                t = indexTuple;
-            } else {
-                t = tuple;
-            }
-
-            bulkLoader.add(t);
-
-            if (filterTuple != null) {
-                filterTuple.reset(tuple);
-                component.getLSMComponentFilter().update(filterTuple, filterCmp);
-            }
-        } catch (Exception e) {
-            cleanupArtifacts();
-            throw e;
-        }
-        if (isEmptyComponent) {
-            isEmptyComponent = false;
-        }
+        componentBulkLoader.add(tuple);
     }
 
     @Override
     public void end() throws HyracksDataException {
-        if (!cleanedUpArtifacts) {
-
-            if (component.getLSMComponentFilter() != null) {
-                lsmIndex.getFilterManager().writeFilter(component.getLSMComponentFilter(),
-                        ((LSMRTreeDiskComponent) component).getRTree());
-            }
-
-            bulkLoader.end();
-            buddyBTreeBulkloader.end();
-
-            if (isEmptyComponent) {
-                cleanupArtifacts();
-            } else {
-                lsmIndex.getIOOperationCallback().afterOperation(LSMOperationType.FLUSH,
null, component);
-                lsmIndex.getLsmHarness().addBulkLoadedComponent(component);
-            }
+        componentBulkLoader.end();
+        if (component.getComponentSize() > 0) {
+            lsmIndex.getIOOperationCallback().afterOperation(LSMOperationType.FLUSH, null,
component);
+            lsmIndex.getLsmHarness().addBulkLoadedComponent(component);
         }
     }
 
     @Override
     public void abort() throws HyracksDataException {
-        if (bulkLoader != null) {
-            bulkLoader.abort();
-        }
-        if (buddyBTreeBulkloader != null) {
-            buddyBTreeBulkloader.abort();
-        }
-    }
-
-    protected void cleanupArtifacts() throws HyracksDataException {
-        if (!cleanedUpArtifacts) {
-            cleanedUpArtifacts = true;
-            ((LSMRTreeDiskComponent) component).getRTree().deactivate();
-            ((LSMRTreeDiskComponent) component).getRTree().destroy();
-            ((LSMRTreeDiskComponent) component).getBTree().deactivate();
-            ((LSMRTreeDiskComponent) component).getBTree().destroy();
-            ((LSMRTreeDiskComponent) component).getBloomFilter().deactivate();
-            ((LSMRTreeDiskComponent) component).getBloomFilter().destroy();
-        }
+        componentBulkLoader.abort();
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1ded1616/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponentBulkLoader.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponentBulkLoader.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponentBulkLoader.java
new file mode 100644
index 0000000..ff0a299
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponentBulkLoader.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.rtree.impls;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
+import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponentWithBuddyBulkLoader;
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.MultiComparator;
+
+public class LSMRTreeDiskComponentBulkLoader extends AbstractLSMDiskComponentWithBuddyBulkLoader
{
+
+    //with filter
+    public LSMRTreeDiskComponentBulkLoader(LSMRTreeDiskComponent component, BloomFilterSpecification
bloomFilterSpec,
+            float fillFactor, boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex,
+            ILSMComponentFilterManager filterManager, int[] indexFields, int[] filterFields,
MultiComparator filterCmp)
+            throws HyracksDataException {
+        super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
filterManager,
+                indexFields, filterFields, filterCmp);
+    }
+
+    //without filter
+    public LSMRTreeDiskComponentBulkLoader(LSMRTreeDiskComponent component, BloomFilterSpecification
bloomFilterSpec,
+            float fillFactor, boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex)
+            throws HyracksDataException {
+        super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
null, null, null,
+                null);
+    }
+
+    @Override
+    protected BloomFilter getBloomFilter(ILSMDiskComponent component) {
+        return ((LSMRTreeDiskComponent) component).getBloomFilter();
+    }
+
+    @Override
+    protected IIndex getIndex(ILSMDiskComponent component) {
+        return ((LSMRTreeDiskComponent) component).getRTree();
+    }
+
+    @Override
+    protected ITreeIndex getBuddyBTree(ILSMDiskComponent component) {
+        return ((LSMRTreeDiskComponent) component).getBTree();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1ded1616/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
index a20e6f2..24c46d7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
@@ -35,7 +35,6 @@ import org.apache.hyracks.storage.am.common.api.ITreeIndexAccessor;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
-import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
 import org.apache.hyracks.storage.am.lsm.common.api.IComponentFilterHelper;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFrameFactory;
@@ -136,14 +135,7 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree {
         SearchPredicate rtreeNullPredicate = new SearchPredicate(null, null);
         memRTreeAccessor.search(rtreeScanCursor, rtreeNullPredicate);
         LSMRTreeDiskComponent component = createDiskComponent(componentFactory, flushOp.getTarget(),
null, null, true);
-        RTree diskRTree = component.getRTree();
-
-        // scan the memory BTree
-        ITreeIndexAccessor memBTreeAccessor = flushingComponent.getBTree()
-                .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
-        BTreeRangeSearchCursor btreeScanCursor = (BTreeRangeSearchCursor) memBTreeAccessor.createSearchCursor(false);
-        RangePredicate btreeNullPredicate = new RangePredicate(null, null, true, true, null,
null);
-        memBTreeAccessor.search(btreeScanCursor, btreeNullPredicate);
+        IIndexBulkLoader componentBulkLoader = createComponentBulkLoader(component, 1.0f,
false, 0L, false, false);
 
         // Since the LSM-RTree is used as a secondary assumption, the
         // primary key will be the last comparator in the BTree comparators
@@ -151,12 +143,6 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree {
                 linearizerArray, rtreeLeafFrameFactory.createFrame(), rtreeLeafFrameFactory.createFrame(),
                 flushingComponent.getRTree().getBufferCache(), comparatorFields);
 
-        TreeTupleSorter bTreeTupleSorter = new TreeTupleSorter(flushingComponent.getBTree().getFileId(),
-                linearizerArray, btreeLeafFrameFactory.createFrame(), btreeLeafFrameFactory.createFrame(),
-                flushingComponent.getBTree().getBufferCache(), comparatorFields);
-        // BulkLoad the tuples from the in-memory tree into the new disk
-        // RTree.
-
         boolean isEmpty = true;
         try {
             while (rtreeScanCursor.hasNext()) {
@@ -171,6 +157,16 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree {
             rTreeTupleSorter.sort();
         }
 
+        // scan the memory BTree
+        ITreeIndexAccessor memBTreeAccessor = flushingComponent.getBTree()
+                .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+        BTreeRangeSearchCursor btreeScanCursor = (BTreeRangeSearchCursor) memBTreeAccessor.createSearchCursor(false);
+        RangePredicate btreeNullPredicate = new RangePredicate(null, null, true, true, null,
null);
+        memBTreeAccessor.search(btreeScanCursor, btreeNullPredicate);
+        TreeTupleSorter bTreeTupleSorter = new TreeTupleSorter(flushingComponent.getBTree().getFileId(),
+                linearizerArray, btreeLeafFrameFactory.createFrame(), btreeLeafFrameFactory.createFrame(),
+                flushingComponent.getBTree().getBufferCache(), comparatorFields);
+
         isEmpty = true;
         try {
             while (btreeScanCursor.hasNext()) {
@@ -185,7 +181,6 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree {
             bTreeTupleSorter.sort();
         }
 
-        IIndexBulkLoader rTreeBulkloader = diskRTree.createBulkLoader(1.0f, false, 0L, false);
         LSMRTreeWithAntiMatterTuplesFlushCursor cursor = new LSMRTreeWithAntiMatterTuplesFlushCursor(rTreeTupleSorter,
                 bTreeTupleSorter, comparatorFields, linearizerArray);
         cursor.open(null, null);
@@ -195,7 +190,7 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree {
                 cursor.next();
                 ITupleReference frameTuple = cursor.getTuple();
 
-                rTreeBulkloader.add(frameTuple);
+                componentBulkLoader.add(frameTuple);
             }
         } finally {
             cursor.close();
@@ -209,8 +204,8 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree {
             getFilterManager().writeFilter(component.getLSMComponentFilter(), component.getRTree());
         }
         flushingComponent.getMetadata().copy(component.getMetadata());
-        rTreeBulkloader.end();
 
+        componentBulkLoader.end();
         return component;
     }
 
@@ -225,13 +220,13 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree {
 
         // Bulk load the tuples from all on-disk RTrees into the new RTree.
         LSMRTreeDiskComponent component = createDiskComponent(componentFactory, mergeOp.getTarget(),
null, null, true);
-        RTree mergedRTree = component.getRTree();
-        IIndexBulkLoader bulkloader = mergedRTree.createBulkLoader(1.0f, false, 0L, false);
+
+        IIndexBulkLoader componentBulkLoader = createComponentBulkLoader(component, 1.0f,
false, 0L, false, false);
         try {
             while (cursor.hasNext()) {
                 cursor.next();
                 ITupleReference frameTuple = cursor.getTuple();
-                bulkloader.add(frameTuple);
+                componentBulkLoader.add(frameTuple);
             }
         } finally {
             cursor.close();
@@ -245,7 +240,8 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree {
             getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples);
             getFilterManager().writeFilter(component.getLSMComponentFilter(), component.getRTree());
         }
-        bulkloader.end();
+
+        componentBulkLoader.end();
 
         return component;
     }
@@ -258,6 +254,20 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree {
     }
 
     @Override
+    public IIndexBulkLoader createComponentBulkLoader(ILSMDiskComponent component, float
fillFactor,
+            boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex, boolean
withFilter)
+            throws HyracksDataException {
+        if (withFilter && filterFields != null) {
+            return new LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader((LSMRTreeDiskComponent)
component, null,
+                    fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, filterManager,
treeFields,
+                    filterFields, MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories()));
+        } else {
+            return new LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader((LSMRTreeDiskComponent)
component, null,
+                    fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
+        }
+    }
+
+    @Override
     public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint)
             throws HyracksDataException {
         return new LSMRTreeWithAntiMatterTuplesBulkLoader(fillLevel, verifyInput, numElementsHint);
@@ -265,91 +275,35 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree {
 
     public class LSMRTreeWithAntiMatterTuplesBulkLoader implements IIndexBulkLoader {
         private final ILSMDiskComponent component;
-        private final IIndexBulkLoader bulkLoader;
-        private boolean cleanedUpArtifacts = false;
-        private boolean isEmptyComponent = true;
-        public final PermutingTupleReference indexTuple;
-        public final PermutingTupleReference filterTuple;
-        public final MultiComparator filterCmp;
+        private final IIndexBulkLoader componentBulkLoader;
 
         public LSMRTreeWithAntiMatterTuplesBulkLoader(float fillFactor, boolean verifyInput,
long numElementsHint)
                 throws HyracksDataException {
-            // Note that by using a flush target file name, we state that the
-            // new bulk loaded tree is "newer" than any other merged tree.
-
             component = createBulkLoadTarget();
-            bulkLoader = ((LSMRTreeDiskComponent) component).getRTree().createBulkLoader(fillFactor,
verifyInput,
-                    numElementsHint, false);
-
-            if (getFilterFields() != null) {
-                indexTuple = new PermutingTupleReference(getTreeFields());
-                filterCmp = MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories());
-                filterTuple = new PermutingTupleReference(getFilterFields());
-            } else {
-                indexTuple = null;
-                filterCmp = null;
-                filterTuple = null;
-            }
+
+            componentBulkLoader =
+                    createComponentBulkLoader(component, fillFactor, verifyInput, numElementsHint,
false, true);
         }
 
         @Override
         public void add(ITupleReference tuple) throws HyracksDataException {
-            try {
-                ITupleReference t;
-                if (indexTuple != null) {
-                    indexTuple.reset(tuple);
-                    t = indexTuple;
-                } else {
-                    t = tuple;
-                }
-
-                bulkLoader.add(t);
-
-                if (filterTuple != null) {
-                    filterTuple.reset(tuple);
-                    component.getLSMComponentFilter().update(filterTuple, filterCmp);
-                }
-
-            } catch (Exception e) {
-                cleanupArtifacts();
-                throw e;
-            }
-            if (isEmptyComponent) {
-                isEmptyComponent = false;
-            }
+            componentBulkLoader.add(tuple);
         }
 
         @Override
         public void end() throws HyracksDataException {
-            if (!cleanedUpArtifacts) {
-
-                if (component.getLSMComponentFilter() != null) {
-                    getFilterManager().writeFilter(component.getLSMComponentFilter(),
-                            ((LSMRTreeDiskComponent) component).getRTree());
-                }
-                bulkLoader.end();
-
-                if (isEmptyComponent) {
-                    cleanupArtifacts();
-                } else {
-                    ioOpCallback.afterOperation(LSMOperationType.FLUSH, null, component);
-                    getLsmHarness().addBulkLoadedComponent(component);
-                }
+
+            componentBulkLoader.end();
+            if (component.getComponentSize() > 0) {
+                ioOpCallback.afterOperation(LSMOperationType.FLUSH, null, component);
+                lsmHarness.addBulkLoadedComponent(component);
             }
         }
 
         @Override
         public void abort() throws HyracksDataException {
-            if (bulkLoader != null) {
-                bulkLoader.abort();
-            }
-        }
-
-        protected void cleanupArtifacts() throws HyracksDataException {
-            if (!cleanedUpArtifacts) {
-                cleanedUpArtifacts = true;
-                ((LSMRTreeDiskComponent) component).getRTree().deactivate();
-                ((LSMRTreeDiskComponent) component).getRTree().destroy();
+            if (componentBulkLoader != null) {
+                componentBulkLoader.abort();
             }
         }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1ded1616/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader.java
new file mode 100644
index 0000000..88a2e1e
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.rtree.impls;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
+import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponentBulkLoader;
+import org.apache.hyracks.storage.common.MultiComparator;
+
+public class LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader extends AbstractLSMDiskComponentBulkLoader
{
+
+    //with filter
+    public LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader(LSMRTreeDiskComponent component,
+            BloomFilterSpecification bloomFilterSpec, float fillFactor, boolean verifyInput,
long numElementsHint,
+            boolean checkIfEmptyIndex, ILSMComponentFilterManager filterManager, int[] indexFields,
int[] filterFields,
+            MultiComparator filterCmp) throws HyracksDataException {
+        super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
filterManager,
+                indexFields, filterFields, filterCmp);
+    }
+
+    //without filter
+    public LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader(LSMRTreeDiskComponent component,
+            BloomFilterSpecification bloomFilterSpec, float fillFactor, boolean verifyInput,
long numElementsHint,
+            boolean checkIfEmptyIndex) throws HyracksDataException {
+        super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
null, null, null,
+                null);
+    }
+
+    @Override
+    protected BloomFilter getBloomFilter(ILSMDiskComponent component) {
+        return ((LSMRTreeDiskComponent) component).getBloomFilter();
+    }
+
+    @Override
+    protected ITreeIndex getIndex(ILSMDiskComponent component) {
+        return ((LSMRTreeDiskComponent) component).getRTree();
+    }
+
+}


Mime
View raw message