asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ima...@apache.org
Subject [2/5] incubator-asterixdb-hyracks git commit: Make LSM bulkload append-only and write-once.
Date Mon, 23 Nov 2015 17:07:24 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/1a659da1/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
index 069a3b5..2388cff 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
@@ -124,7 +124,7 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree {
         List<ILSMComponent> immutableComponents = diskComponents;
         for (ILSMComponent c : immutableComponents) {
             RTree rtree = ((LSMRTreeDiskComponent) c).getRTree();
-            rtree.deactivate();
+            rtree.deactivateCloseHandle();
         }
         isActivated = false;
     }
@@ -233,7 +233,7 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree {
             bTreeTupleSorter.sort();
         }
 
-        IIndexBulkLoader rTreeBulkloader = diskRTree.createBulkLoader(1.0f, false, 0L, false);
+        IIndexBulkLoader rTreeBulkloader = diskRTree.createBulkLoader(1.0f, false, 0L, false, true);
         LSMRTreeWithAntiMatterTuplesFlushCursor cursor = new LSMRTreeWithAntiMatterTuplesFlushCursor(rTreeTupleSorter,
                 bTreeTupleSorter, comparatorFields, linearizerArray);
         cursor.open(null, null);
@@ -249,15 +249,15 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree {
             cursor.close();
         }
 
-        rTreeBulkloader.end();
-
         if (component.getLSMComponentFilter() != null) {
             List<ITupleReference> filterTuples = new ArrayList<ITupleReference>();
             filterTuples.add(flushingComponent.getLSMComponentFilter().getMinTuple());
             filterTuples.add(flushingComponent.getLSMComponentFilter().getMaxTuple());
             filterManager.updateFilterInfo(component.getLSMComponentFilter(), filterTuples);
-            filterManager.writeFilterInfo(component.getLSMComponentFilter(), component.getRTree());
+            filterManager.writeFilterInfo(component.getLSMComponentFilter(), component.getRTree()
+            );
         }
+        rTreeBulkloader.end();
 
         return component;
     }
@@ -293,7 +293,7 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree {
         LSMRTreeDiskComponent component = createDiskComponent(componentFactory, mergeOp.getRTreeMergeTarget(), null,
                 null, true);
         RTree mergedRTree = component.getRTree();
-        IIndexBulkLoader bulkloader = mergedRTree.createBulkLoader(1.0f, false, 0L, false);
+        IIndexBulkLoader bulkloader = mergedRTree.createBulkLoader(1.0f, false, 0L, false, true);
         try {
             while (cursor.hasNext()) {
                 cursor.next();
@@ -303,8 +303,6 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree {
         } finally {
             cursor.close();
         }
-        bulkloader.end();
-
         if (component.getLSMComponentFilter() != null) {
             List<ITupleReference> filterTuples = new ArrayList<ITupleReference>();
             for (int i = 0; i < mergeOp.getMergingComponents().size(); ++i) {
@@ -312,8 +310,10 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree {
                 filterTuples.add(mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMaxTuple());
             }
             filterManager.updateFilterInfo(component.getLSMComponentFilter(), filterTuples);
-            filterManager.writeFilterInfo(component.getLSMComponentFilter(), component.getBTree());
+            filterManager.writeFilterInfo(component.getLSMComponentFilter(), component.getBTree()
+            );
         }
+        bulkloader.end();
 
         return component;
     }
@@ -379,7 +379,7 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree {
                 throw new TreeIndexException(e);
             }
             bulkLoader = ((LSMRTreeDiskComponent) component).getRTree().createBulkLoader(fillFactor, verifyInput,
-                    numElementsHint, false);
+                    numElementsHint, false, true);
 
             if (filterFields != null) {
                 indexTuple = new PermutingTupleReference(rtreeFields);
@@ -422,12 +422,12 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree {
         @Override
         public void end() throws HyracksDataException, IndexException {
             if (!cleanedUpArtifacts) {
-                bulkLoader.end();
 
                 if (component.getLSMComponentFilter() != null) {
                     filterManager.writeFilterInfo(component.getLSMComponentFilter(),
                             ((LSMRTreeDiskComponent) component).getRTree());
                 }
+                bulkLoader.end();
 
                 if (isEmptyComponent) {
                     cleanupArtifacts();
@@ -437,6 +437,13 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree {
             }
         }
 
+        @Override
+        public void abort() throws HyracksDataException {
+            if(bulkLoader != null){
+                bulkLoader.abort();
+            }
+        }
+
         protected void cleanupArtifacts() throws HyracksDataException {
             if (!cleanedUpArtifacts) {
                 cleanedUpArtifacts = true;
@@ -450,11 +457,18 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree {
     @Override
     public void markAsValid(ILSMComponent lsmComponent) throws HyracksDataException {
         RTree rtree = ((LSMRTreeDiskComponent) lsmComponent).getRTree();
-        forceFlushDirtyPages(rtree);
         markAsValidInternal(rtree);
     }
 
     @Override
+    public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint,
+            boolean checkIfEmptyIndex, boolean appendOnly) throws IndexException {
+        if (!appendOnly)
+            throw new UnsupportedOperationException("LSM indexes don't support in-place modification");
+        return createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
+    }
+
+    @Override
     public Set<String> getLSMComponentPhysicalFiles(ILSMComponent lsmComponent) {
         Set<String> files = new HashSet<String>();
 
@@ -463,5 +477,4 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree {
 
         return files;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/1a659da1/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/RTreeFactory.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/RTreeFactory.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/RTreeFactory.java
index c35ccb3..041d8df 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/RTreeFactory.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/RTreeFactory.java
@@ -20,8 +20,9 @@
 package org.apache.hyracks.storage.am.lsm.rtree.impls;
 
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.storage.am.common.api.IFreePageManagerFactory;
+import org.apache.hyracks.storage.am.common.api.IMetadataManagerFactory;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import org.apache.hyracks.storage.am.common.api.IndexException;
 import org.apache.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
@@ -32,7 +33,7 @@ import org.apache.hyracks.storage.common.file.IFileMapProvider;
 public class RTreeFactory extends TreeIndexFactory<RTree> {
 
     public RTreeFactory(IBufferCache bufferCache, IFileMapProvider fileMapProvider,
-            IFreePageManagerFactory freePageManagerFactory, ITreeIndexFrameFactory interiorFrameFactory,
+            IMetadataManagerFactory freePageManagerFactory, ITreeIndexFrameFactory interiorFrameFactory,
             ITreeIndexFrameFactory leafFrameFactory, IBinaryComparatorFactory[] cmpFactories, int fieldCount) {
         super(bufferCache, fileMapProvider, freePageManagerFactory, interiorFrameFactory, leafFrameFactory,
                 cmpFactories, fieldCount);
@@ -40,8 +41,12 @@ public class RTreeFactory extends TreeIndexFactory<RTree> {
 
     @Override
     public RTree createIndexInstance(FileReference file) throws IndexException {
-        return new RTree(bufferCache, fileMapProvider, freePageManagerFactory.createFreePageManager(),
-                interiorFrameFactory, leafFrameFactory, cmpFactories, fieldCount, file);
+        try {
+            return new RTree(bufferCache, fileMapProvider, freePageManagerFactory.createFreePageManager(),
+                    interiorFrameFactory, leafFrameFactory, cmpFactories, fieldCount, file);
+        } catch (HyracksDataException e) {
+            throw new IndexException(e);
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/1a659da1/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/utils/LSMRTreeUtils.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/utils/LSMRTreeUtils.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/utils/LSMRTreeUtils.java
index 574dfcd..d34f4c2 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/utils/LSMRTreeUtils.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/utils/LSMRTreeUtils.java
@@ -36,7 +36,7 @@ import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexMetaDataFrameFactory;
 import org.apache.hyracks.storage.am.common.api.TreeIndexException;
 import org.apache.hyracks.storage.am.common.frames.LIFOMetaDataFrameFactory;
-import org.apache.hyracks.storage.am.common.freepage.LinkedListFreePageManagerFactory;
+import org.apache.hyracks.storage.am.common.freepage.LinkedListMetadataManagerFactory;
 import org.apache.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
@@ -97,7 +97,7 @@ public class LSMRTreeUtils {
         ITreeIndexFrameFactory btreeLeafFrameFactory = new BTreeNSMLeafFrameFactory(btreeTupleWriterFactory);
 
         ITreeIndexMetaDataFrameFactory metaFrameFactory = new LIFOMetaDataFrameFactory();
-        LinkedListFreePageManagerFactory freePageManagerFactory = new LinkedListFreePageManagerFactory(diskBufferCache,
+        LinkedListMetadataManagerFactory freePageManagerFactory = new LinkedListMetadataManagerFactory(diskBufferCache,
                 metaFrameFactory);
 
         TreeIndexFactory<RTree> diskRTreeFactory = new RTreeFactory(diskBufferCache, diskFileMapProvider,
@@ -164,7 +164,7 @@ public class LSMRTreeUtils {
                 valueProviderFactories, rtreePolicyType);
 
         ITreeIndexMetaDataFrameFactory metaFrameFactory = new LIFOMetaDataFrameFactory();
-        LinkedListFreePageManagerFactory freePageManagerFactory = new LinkedListFreePageManagerFactory(diskBufferCache,
+        LinkedListMetadataManagerFactory freePageManagerFactory = new LinkedListMetadataManagerFactory(diskBufferCache,
                 metaFrameFactory);
 
         TreeIndexFactory<RTree> diskRTreeFactory = new RTreeFactory(diskBufferCache, diskFileMapProvider,
@@ -240,7 +240,7 @@ public class LSMRTreeUtils {
         ITreeIndexFrameFactory btreeLeafFrameFactory = new BTreeNSMLeafFrameFactory(btreeTupleWriterFactory);
 
         ITreeIndexMetaDataFrameFactory metaFrameFactory = new LIFOMetaDataFrameFactory();
-        LinkedListFreePageManagerFactory freePageManagerFactory = new LinkedListFreePageManagerFactory(diskBufferCache,
+        LinkedListMetadataManagerFactory freePageManagerFactory = new LinkedListMetadataManagerFactory(diskBufferCache,
                 metaFrameFactory);
 
         TreeIndexFactory<RTree> diskRTreeFactory = new RTreeFactory(diskBufferCache, diskFileMapProvider,

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/1a659da1/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/frames/RStarTreePolicy.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/frames/RStarTreePolicy.java b/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/frames/RStarTreePolicy.java
index 7b357f0..c5159e1 100644
--- a/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/frames/RStarTreePolicy.java
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/frames/RStarTreePolicy.java
@@ -225,12 +225,12 @@ public class RStarTreePolicy implements IRTreePolicy {
 
         splitKey.initData(splitKeySize);
         leftRTreeFrame.adjustMBR();
-        rTreeTupleWriterleftRTreeFrame.writeTupleFields(leftRTreeFrame.getTuples(), 0,
+        rTreeTupleWriterleftRTreeFrame.writeTupleFields(leftRTreeFrame.getMBRTuples(), 0,
                 rTreeSplitKey.getLeftPageBuffer(), 0);
         rTreeSplitKey.getLeftTuple().resetByTupleOffset(rTreeSplitKey.getLeftPageBuffer(), 0);
 
         ((IRTreeFrame) rightFrame).adjustMBR();
-        rTreeTupleWriterRightFrame.writeTupleFields(((RTreeNSMFrame) rightFrame).getTuples(), 0,
+        rTreeTupleWriterRightFrame.writeTupleFields(((RTreeNSMFrame) rightFrame).getMBRTuples(), 0,
                 rTreeSplitKey.getRightPageBuffer(), 0);
         rTreeSplitKey.getRightTuple().resetByTupleOffset(rTreeSplitKey.getRightPageBuffer(), 0);
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/1a659da1/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/frames/RTreeNSMFrame.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/frames/RTreeNSMFrame.java b/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/frames/RTreeNSMFrame.java
index 37ba2a5..2331f3b 100644
--- a/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/frames/RTreeNSMFrame.java
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/frames/RTreeNSMFrame.java
@@ -36,7 +36,7 @@ public abstract class RTreeNSMFrame extends TreeIndexNSMFrame implements IRTreeF
     protected static final int pageNsnOff = smFlagOff + 1;
     protected static final int rightPageOff = pageNsnOff + 8;
 
-    protected ITreeIndexTupleReference[] tuples;
+    protected ITreeIndexTupleReference[] mbrTuples;
     protected ITreeIndexTupleReference cmpFrameTuple;
 
     private static final double doubleEpsilon = computeDoubleEpsilon();
@@ -47,9 +47,9 @@ public abstract class RTreeNSMFrame extends TreeIndexNSMFrame implements IRTreeF
     public RTreeNSMFrame(ITreeIndexTupleWriter tupleWriter, IPrimitiveValueProvider[] keyValueProviders,
             RTreePolicyType rtreePolicyType) {
         super(tupleWriter, new UnorderedSlotManager());
-        this.tuples = new ITreeIndexTupleReference[keyValueProviders.length];
+        this.mbrTuples = new ITreeIndexTupleReference[keyValueProviders.length];
         for (int i = 0; i < keyValueProviders.length; i++) {
-            this.tuples[i] = tupleWriter.createTupleReference();
+            this.mbrTuples[i] = tupleWriter.createTupleReference();
         }
         cmpFrameTuple = tupleWriter.createTupleReference();
         this.keyValueProviders = keyValueProviders;
@@ -111,8 +111,8 @@ public abstract class RTreeNSMFrame extends TreeIndexNSMFrame implements IRTreeF
         buf.putInt(rightPageOff, rightPage);
     }
 
-    public ITreeIndexTupleReference[] getTuples() {
-        return tuples;
+    public ITreeIndexTupleReference[] getMBRTuples() {
+        return mbrTuples;
     }
 
     @Override
@@ -123,7 +123,7 @@ public abstract class RTreeNSMFrame extends TreeIndexNSMFrame implements IRTreeF
 
     abstract public int getTupleSize(ITupleReference tuple);
 
-    public void adjustMBRImpl(ITreeIndexTupleReference[] tuples) {
+    protected void calculateMBRImpl(ITreeIndexTupleReference[] tuples) {
         int maxFieldPos = keyValueProviders.length / 2;
         for (int i = 1; i < getTupleCount(); i++) {
             frameTuple.resetByTupleIndex(this, i);
@@ -145,12 +145,12 @@ public abstract class RTreeNSMFrame extends TreeIndexNSMFrame implements IRTreeF
 
     @Override
     public void adjustMBR() {
-        for (int i = 0; i < tuples.length; i++) {
-            tuples[i].setFieldCount(getFieldCount());
-            tuples[i].resetByTupleIndex(this, 0);
+        for (int i = 0; i < mbrTuples.length; i++) {
+            mbrTuples[i].setFieldCount(getFieldCount());
+            mbrTuples[i].resetByTupleIndex(this, 0);
         }
 
-        adjustMBRImpl(tuples);
+        calculateMBRImpl(mbrTuples);
     }
 
     public abstract int getFieldCount();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/1a659da1/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/frames/RTreeNSMInteriorFrame.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/frames/RTreeNSMInteriorFrame.java b/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/frames/RTreeNSMInteriorFrame.java
index 3b71be8..5c2e95e 100644
--- a/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/frames/RTreeNSMInteriorFrame.java
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/frames/RTreeNSMInteriorFrame.java
@@ -32,6 +32,7 @@ import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProvider;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleReference;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
 import org.apache.hyracks.storage.am.common.api.TreeIndexException;
+import org.apache.hyracks.storage.am.common.frames.AbstractSlotManager;
 import org.apache.hyracks.storage.am.common.frames.FrameOpSpaceStatus;
 import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator;
 import org.apache.hyracks.storage.am.common.ophelpers.SlotOffTupleOff;
@@ -40,7 +41,7 @@ import org.apache.hyracks.storage.am.rtree.impls.PathList;
 
 public class RTreeNSMInteriorFrame extends RTreeNSMFrame implements IRTreeInteriorFrame {
 
-    private static final int childPtrSize = 4;
+    public static final int childPtrSize = 4;
     private IBinaryComparator childPtrCmp = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY)
             .createBinaryComparator();
     private final int keyFieldCount;
@@ -53,7 +54,7 @@ public class RTreeNSMInteriorFrame extends RTreeNSMFrame implements IRTreeInteri
     }
 
     @Override
-    public int getBytesRequriedToWriteTuple(ITupleReference tuple) {
+    public int getBytesRequiredToWriteTuple(ITupleReference tuple) {
         return tupleWriter.bytesRequired(tuple) + childPtrSize + slotManager.getSlotSize();
     }
 
@@ -184,6 +185,16 @@ public class RTreeNSMInteriorFrame extends RTreeNSMFrame implements IRTreeInteri
             return FrameOpSpaceStatus.INSUFFICIENT_SPACE;
     }
 
+    public FrameOpSpaceStatus hasSpaceInsert(int bytesRequired) {
+        if (bytesRequired + slotManager.getSlotSize() <= buf.capacity() - buf.getInt(freeSpaceOff)
+                - (buf.getInt(tupleCountOff) * slotManager.getSlotSize()))
+            return FrameOpSpaceStatus.SUFFICIENT_CONTIGUOUS_SPACE;
+        else if (bytesRequired + slotManager.getSlotSize() <= buf.getInt(totalFreeSpaceOff))
+            return FrameOpSpaceStatus.SUFFICIENT_SPACE;
+        else
+            return FrameOpSpaceStatus.INSUFFICIENT_SPACE;
+    }
+
     @Override
     public void adjustKey(ITupleReference tuple, int tupleIndex, MultiComparator cmp) throws TreeIndexException {
         frameTuple.setFieldCount(cmp.getKeyFieldCount());
@@ -221,7 +232,7 @@ public class RTreeNSMInteriorFrame extends RTreeNSMFrame implements IRTreeInteri
     @Override
     public void insert(ITupleReference tuple, int tupleIndex) {
         frameTuple.setFieldCount(tuple.getFieldCount());
-        slotManager.insertSlot(-1, buf.getInt(freeSpaceOff));
+        slotManager.insertSlot(AbstractSlotManager.GREATEST_KEY_INDICATOR, buf.getInt(freeSpaceOff));
         int freeSpace = buf.getInt(freeSpaceOff);
         int bytesWritten = tupleWriter.writeTupleFields(tuple, 0, tuple.getFieldCount(), buf.array(), freeSpace);
         System.arraycopy(tuple.getFieldData(tuple.getFieldCount() - 1), getChildPointerOff(tuple), buf.array(),

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/1a659da1/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/frames/RTreeNSMLeafFrame.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/frames/RTreeNSMLeafFrame.java b/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/frames/RTreeNSMLeafFrame.java
index 8faf5a2..591ce67 100644
--- a/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/frames/RTreeNSMLeafFrame.java
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/frames/RTreeNSMLeafFrame.java
@@ -35,7 +35,7 @@ public class RTreeNSMLeafFrame extends RTreeNSMFrame implements IRTreeLeafFrame
     }
 
     @Override
-    public int getBytesRequriedToWriteTuple(ITupleReference tuple) {
+    public int getBytesRequiredToWriteTuple(ITupleReference tuple) {
         return tupleWriter.bytesRequired(tuple) + slotManager.getSlotSize();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/1a659da1/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/frames/RTreePolicy.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/frames/RTreePolicy.java b/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/frames/RTreePolicy.java
index 2bcaa77..1c950c6 100644
--- a/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/frames/RTreePolicy.java
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/frames/RTreePolicy.java
@@ -196,11 +196,11 @@ public class RTreePolicy implements IRTreePolicy {
 
         splitKey.initData(splitKeySize);
         leftRTreeFrame.adjustMBR();
-        rTreeTupleWriterLeftFrame.writeTupleFields(leftRTreeFrame.getTuples(), 0, rTreeSplitKey.getLeftPageBuffer(), 0);
+        rTreeTupleWriterLeftFrame.writeTupleFields(leftRTreeFrame.getMBRTuples(), 0, rTreeSplitKey.getLeftPageBuffer(), 0);
         rTreeSplitKey.getLeftTuple().resetByTupleOffset(rTreeSplitKey.getLeftPageBuffer(), 0);
 
         ((IRTreeFrame) rightFrame).adjustMBR();
-        rTreeTupleWriterRightFrame.writeTupleFields(((RTreeNSMFrame) rightFrame).getTuples(), 0,
+        rTreeTupleWriterRightFrame.writeTupleFields(((RTreeNSMFrame) rightFrame).getMBRTuples(), 0,
                 rTreeSplitKey.getRightPageBuffer(), 0);
         rTreeSplitKey.getRightTuple().resetByTupleOffset(rTreeSplitKey.getRightPageBuffer(), 0);
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/1a659da1/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java b/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
index 2cb72cc..ada54d2 100644
--- a/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
@@ -21,6 +21,7 @@ package org.apache.hyracks.storage.am.rtree.impls;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -28,20 +29,9 @@ import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import org.apache.hyracks.storage.am.common.api.IFreePageManager;
-import org.apache.hyracks.storage.am.common.api.IIndexBulkLoader;
-import org.apache.hyracks.storage.am.common.api.IIndexCursor;
-import org.apache.hyracks.storage.am.common.api.IIndexOperationContext;
-import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
-import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
-import org.apache.hyracks.storage.am.common.api.ISearchPredicate;
-import org.apache.hyracks.storage.am.common.api.ITreeIndexAccessor;
-import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor;
-import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
-import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
-import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleReference;
-import org.apache.hyracks.storage.am.common.api.IndexException;
-import org.apache.hyracks.storage.am.common.api.TreeIndexException;
+import org.apache.hyracks.storage.am.common.api.*;
+import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
+import org.apache.hyracks.storage.am.common.frames.AbstractSlotManager;
 import org.apache.hyracks.storage.am.common.frames.FrameOpSpaceStatus;
 import org.apache.hyracks.storage.am.common.impls.AbstractTreeIndex;
 import org.apache.hyracks.storage.am.common.impls.NodeFrontier;
@@ -55,6 +45,7 @@ import org.apache.hyracks.storage.am.rtree.api.IRTreeLeafFrame;
 import org.apache.hyracks.storage.am.rtree.frames.RTreeNSMFrame;
 import org.apache.hyracks.storage.am.rtree.frames.RTreeNSMInteriorFrame;
 import org.apache.hyracks.storage.am.rtree.tuples.RTreeTypeAwareTupleWriter;
+import org.apache.hyracks.storage.common.buffercache.BufferCache;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
@@ -67,7 +58,7 @@ public class RTree extends AbstractTreeIndex {
 
     private final int maxTupleSize;
 
-    public RTree(IBufferCache bufferCache, IFileMapProvider fileMapProvider, IFreePageManager freePageManager,
+    public RTree(IBufferCache bufferCache, IFileMapProvider fileMapProvider, IMetaDataPageManager freePageManager,
             ITreeIndexFrameFactory interiorFrameFactory, ITreeIndexFrameFactory leafFrameFactory,
             IBinaryComparatorFactory[] cmpFactories, int fieldCount, FileReference file) {
         super(bufferCache, fileMapProvider, freePageManager, interiorFrameFactory, leafFrameFactory, cmpFactories,
@@ -157,8 +148,8 @@ public class RTree extends AbstractTreeIndex {
     private void insert(ITupleReference tuple, IIndexOperationContext ictx) throws HyracksDataException,
             TreeIndexException {
         RTreeOpContext ctx = (RTreeOpContext) ictx;
-        int tupleSize = Math.max(ctx.leafFrame.getBytesRequriedToWriteTuple(tuple),
-                ctx.interiorFrame.getBytesRequriedToWriteTuple(tuple));
+        int tupleSize = Math.max(ctx.leafFrame.getBytesRequiredToWriteTuple(tuple),
+                ctx.interiorFrame.getBytesRequiredToWriteTuple(tuple));
         if (tupleSize > maxTupleSize) {
             throw new TreeIndexException("Record size (" + tupleSize + ") larger than maximum acceptable record size ("
                     + maxTupleSize + ")");
@@ -774,7 +765,7 @@ public class RTree extends AbstractTreeIndex {
         MultiComparator cmp = MultiComparator.create(cmpFactories);
         SearchPredicate searchPred = new SearchPredicate(null, cmp);
 
-        int currentPageId = rootPage;
+        int currentPageId = bulkloadLeafStart;
         int maxPageId = freePageManager.getMaxPage(ctx.metaFrame);
 
         ICachedPage page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, currentPageId), false);
@@ -867,8 +858,14 @@ public class RTree extends AbstractTreeIndex {
     public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint,
             boolean checkIfEmptyIndex) throws TreeIndexException {
         // TODO: verifyInput currently does nothing.
+        return createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, false);
+    }
+
+    public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint,
+            boolean checkIfEmptyIndex, boolean appendOnly) throws TreeIndexException {
+        // TODO: verifyInput currently does nothing.
         try {
-            return new RTreeBulkLoader(fillFactor);
+            return new RTreeBulkLoader(fillFactor, appendOnly);
         } catch (HyracksDataException e) {
             throw new TreeIndexException(e);
         }
@@ -879,17 +876,19 @@ public class RTree extends AbstractTreeIndex {
         RTreeTypeAwareTupleWriter tupleWriter = ((RTreeTypeAwareTupleWriter) interiorFrame.getTupleWriter());
         ITreeIndexTupleReference mbrTuple = interiorFrame.createTupleReference();
         ByteBuffer mbr;
+        List<Integer> prevNodeFrontierPages = new ArrayList<Integer>();
+        List<ICachedPage> pagesToWrite = new ArrayList<ICachedPage>();
 
-        public RTreeBulkLoader(float fillFactor) throws TreeIndexException, HyracksDataException {
-            super(fillFactor);
+        public RTreeBulkLoader(float fillFactor, boolean appendOnly) throws TreeIndexException, HyracksDataException {
+            super(fillFactor, appendOnly);
             prevInteriorFrame = interiorFrameFactory.createFrame();
         }
 
         @Override
         public void add(ITupleReference tuple) throws IndexException, HyracksDataException {
             try {
-                int tupleSize = Math.max(leafFrame.getBytesRequriedToWriteTuple(tuple),
-                        interiorFrame.getBytesRequriedToWriteTuple(tuple));
+                int tupleSize = Math.max(leafFrame.getBytesRequiredToWriteTuple(tuple),
+                        interiorFrame.getBytesRequiredToWriteTuple(tuple));
                 if (tupleSize > maxTupleSize) {
                     throw new TreeIndexException("Space required for record (" + tupleSize
                             + ") larger than maximum acceptable size (" + maxTupleSize + ")");
@@ -907,22 +906,29 @@ public class RTree extends AbstractTreeIndex {
                 }
 
                 if (spaceUsed + spaceNeeded > leafMaxBytes) {
-                    propagateBulk(1, false);
 
-                    leafFrontier.pageId = freePageManager.getFreePage(metaFrame);
-
-                    leafFrontier.page.releaseWriteLatch(true);
-                    bufferCache.unpin(leafFrontier.page);
+                    if (prevNodeFrontierPages.size() == 0) {
+                        prevNodeFrontierPages.add(leafFrontier.pageId);
+                    } else {
+                        prevNodeFrontierPages.set(0, leafFrontier.pageId);
+                    }
+                    pagesToWrite.clear();
+                    propagateBulk(1, false, pagesToWrite);
 
-                    leafFrontier.page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, leafFrontier.pageId),
-                            true);
-                    leafFrontier.page.acquireWriteLatch();
+                    leafFrontier.pageId = freePageManager.getFreePage(metaFrame);
+                    queue.put(leafFrontier.page);
+                    for (ICachedPage c : pagesToWrite) {
+                        queue.put(c);
+                    }
+                    leafFrontier.page = bufferCache.confiscatePage(BufferedFileHandle.getDiskPageId(fileId,
+                            leafFrontier.pageId));
                     leafFrame.setPage(leafFrontier.page);
                     leafFrame.initBuffer((byte) 0);
+
                 }
 
                 leafFrame.setPage(leafFrontier.page);
-                leafFrame.insert(tuple, -1);
+                leafFrame.insert(tuple, AbstractSlotManager.GREATEST_KEY_INDICATOR);
             } catch (HyracksDataException e) {
                 handleException();
                 throw e;
@@ -934,12 +940,61 @@ public class RTree extends AbstractTreeIndex {
         }
 
         public void end() throws HyracksDataException {
-            propagateBulk(1, true);
+            pagesToWrite.clear();
+            //if writing a trivial 1-page tree, don't try and propagate up
+            if (nodeFrontiers.size() > 1) {
+                propagateBulk(1, true, pagesToWrite);
+            }
 
+            for (ICachedPage c : pagesToWrite) {
+                queue.put(c);
+            }
+            finish();
             super.end();
         }
 
-        protected void propagateBulk(int level, boolean toRoot) throws HyracksDataException {
+        @Override
+        public void abort() throws HyracksDataException {
+            super.handleException();
+        }
+
+        protected void finish() throws HyracksDataException {
+            int prevPageId = -1;
+            //here we assign physical identifiers to everything we can
+            for (NodeFrontier n : nodeFrontiers) {
+                //not a leaf
+                if (nodeFrontiers.indexOf(n) != 0) {
+                    interiorFrame.setPage(n.page);
+                    mbrTuple.resetByTupleOffset(mbr, 0);
+                    interiorFrame.insert(mbrTuple, -1);
+                    interiorFrame.getBuffer().putInt(
+                            interiorFrame.getTupleOffset(interiorFrame.getTupleCount() - 1) + mbrTuple.getTupleSize(),
+                            prevPageId);
+
+                    int finalPageId = freePageManager.getFreePage(metaFrame);
+                    n.pageId = finalPageId;
+                    bufferCache.setPageDiskId(n.page, BufferedFileHandle.getDiskPageId(fileId, finalPageId));
+                    //else we are looking at a leaf
+                }
+                //set next guide MBR
+                //if propagateBulk didnt have to do anything this may be un-necessary
+                if (nodeFrontiers.size() > 1 && nodeFrontiers.indexOf(n) < nodeFrontiers.size()-1) {
+                    lowerFrame.setPage(n.page);
+                    ((RTreeNSMFrame) lowerFrame).adjustMBR();
+                    tupleWriter.writeTupleFields(((RTreeNSMFrame) lowerFrame).getMBRTuples(), 0, mbr, 0);
+                }
+                queue.put(n.page);
+                n.page = null;
+                prevPageId = n.pageId;
+            }
+            if (appendOnly) {
+                rootPage = nodeFrontiers.get(nodeFrontiers.size() - 1).pageId;
+            }
+            releasedLatches = true;
+        }
+
+        protected void propagateBulk(int level, boolean toRoot, List<ICachedPage> pagesToWrite)
+                throws HyracksDataException {
             boolean propagated = false;
 
             if (level == 1)
@@ -951,47 +1006,66 @@ public class RTree extends AbstractTreeIndex {
             if (level >= nodeFrontiers.size())
                 addLevel();
 
+            //adjust the tuple pointers of the lower frame to allow us to calculate our MBR
+            //if this is a leaf, then there is only one tuple, so this is trivial
             ((RTreeNSMFrame) lowerFrame).adjustMBR();
 
             if (mbr == null) {
-                int bytesRequired = tupleWriter.bytesRequired(((RTreeNSMFrame) lowerFrame).getTuples()[0], 0,
+                int bytesRequired = tupleWriter.bytesRequired(((RTreeNSMFrame) lowerFrame).getMBRTuples()[0], 0,
                         cmp.getKeyFieldCount())
                         + ((RTreeNSMInteriorFrame) interiorFrame).getChildPointerSize();
                 mbr = ByteBuffer.allocate(bytesRequired);
             }
-            tupleWriter.writeTupleFields(((RTreeNSMFrame) lowerFrame).getTuples(), 0, mbr, 0);
+            tupleWriter.writeTupleFields(((RTreeNSMFrame) lowerFrame).getMBRTuples(), 0, mbr, 0);
             mbrTuple.resetByTupleOffset(mbr, 0);
 
             NodeFrontier frontier = nodeFrontiers.get(level);
             interiorFrame.setPage(frontier.page);
+            //see if we have space for two tuples. this works around a  tricky boundary condition with sequential bulk load where
+            //finalization can possibly lead to a split
+            //TODO: accomplish this without wasting 1 tuple
+            int sizeOfTwoTuples = 2 * (mbrTuple.getTupleSize() + RTreeNSMInteriorFrame.childPtrSize);
+            FrameOpSpaceStatus spaceForTwoTuples = (((RTreeNSMInteriorFrame) interiorFrame)
+                    .hasSpaceInsert(sizeOfTwoTuples));
+            if (spaceForTwoTuples != FrameOpSpaceStatus.SUFFICIENT_CONTIGUOUS_SPACE && !toRoot) {
+
+                int finalPageId = freePageManager.getFreePage(metaFrame);
+                if (prevNodeFrontierPages.size() <= level) {
+                    prevNodeFrontierPages.add(finalPageId);
+                } else {
+                    prevNodeFrontierPages.set(level, finalPageId);
+                }
+                bufferCache.setPageDiskId(frontier.page, BufferedFileHandle.getDiskPageId(fileId, finalPageId));
+                pagesToWrite.add(frontier.page);
 
-            interiorFrame.insert(mbrTuple, -1);
-
-            interiorFrame.getBuffer().putInt(
-                    interiorFrame.getTupleOffset(interiorFrame.getTupleCount() - 1) + mbrTuple.getTupleSize(),
-                    nodeFrontiers.get(level - 1).pageId);
-
-            if (interiorFrame.hasSpaceInsert(mbrTuple) != FrameOpSpaceStatus.SUFFICIENT_CONTIGUOUS_SPACE && !toRoot) {
                 lowerFrame = prevInteriorFrame;
                 lowerFrame.setPage(frontier.page);
 
-                propagateBulk(level + 1, toRoot);
-                propagated = true;
-
-                frontier.page.releaseWriteLatch(true);
-                bufferCache.unpin(frontier.page);
-                frontier.pageId = freePageManager.getFreePage(metaFrame);
-
-                frontier.page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, frontier.pageId), true);
-                frontier.page.acquireWriteLatch();
+                frontier.page = bufferCache.confiscatePage(BufferCache.INVALID_DPID);
                 interiorFrame.setPage(frontier.page);
                 interiorFrame.initBuffer((byte) level);
+
+                interiorFrame.insert(mbrTuple, AbstractSlotManager.GREATEST_KEY_INDICATOR);
+
+                interiorFrame.getBuffer().putInt(
+                        interiorFrame.getTupleOffset(interiorFrame.getTupleCount() - 1) + mbrTuple.getTupleSize(),
+                        prevNodeFrontierPages.get(level - 1));
+
+                propagateBulk(level + 1, toRoot, pagesToWrite);
+            } else if (interiorFrame.hasSpaceInsert(mbrTuple) == FrameOpSpaceStatus.SUFFICIENT_CONTIGUOUS_SPACE
+                    && !toRoot) {
+
+                interiorFrame.insert(mbrTuple, -1);
+
+                interiorFrame.getBuffer().putInt(
+                        interiorFrame.getTupleOffset(interiorFrame.getTupleCount() - 1) + mbrTuple.getTupleSize(),
+                        prevNodeFrontierPages.get(level - 1));
             }
 
-            if (toRoot && !propagated && level < nodeFrontiers.size() - 1) {
+            if (toRoot && level < nodeFrontiers.size() - 1) {
                 lowerFrame = prevInteriorFrame;
                 lowerFrame.setPage(frontier.page);
-                propagateBulk(level + 1, true);
+                propagateBulk(level + 1, true, pagesToWrite);
             }
 
             leafFrame.setPage(nodeFrontiers.get(0).page);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/1a659da1/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTreeSearchCursor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTreeSearchCursor.java b/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTreeSearchCursor.java
index 729e7e0..5d8ce2e 100644
--- a/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTreeSearchCursor.java
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTreeSearchCursor.java
@@ -104,6 +104,8 @@ public class RTreeSearchCursor implements ITreeIndexCursor {
             int pageId = pathList.getLastPageId();
             long parentLsn = pathList.getLastPageLsn();
             pathList.moveLast();
+            if(pageId <0) throw new IllegalStateException();
+            if(fileId<0) throw new IllegalStateException();
             ICachedPage node = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId), false);
             node.acquireReadLatch();
             readLatched = true;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/1a659da1/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/util/RTreeUtils.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/util/RTreeUtils.java b/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/util/RTreeUtils.java
index 0e3eb80..234aa26 100644
--- a/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/util/RTreeUtils.java
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/util/RTreeUtils.java
@@ -25,13 +25,13 @@ import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.data.std.api.IPointableFactory;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import org.apache.hyracks.storage.am.common.api.IFreePageManager;
 import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexMetaDataFrameFactory;
+import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
 import org.apache.hyracks.storage.am.common.data.PointablePrimitiveValueProviderFactory;
 import org.apache.hyracks.storage.am.common.frames.LIFOMetaDataFrameFactory;
-import org.apache.hyracks.storage.am.common.freepage.LinkedListFreePageManager;
+import org.apache.hyracks.storage.am.common.freepage.LinkedMetaDataPageManager;
 import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator;
 import org.apache.hyracks.storage.am.rtree.frames.RTreeNSMInteriorFrameFactory;
 import org.apache.hyracks.storage.am.rtree.frames.RTreeNSMLeafFrameFactory;
@@ -54,7 +54,7 @@ public class RTreeUtils {
                 valueProviderFactories, rtreePolicyType);
         ITreeIndexMetaDataFrameFactory metaFrameFactory = new LIFOMetaDataFrameFactory();
 
-        IFreePageManager freePageManager = new LinkedListFreePageManager(bufferCache, 0, metaFrameFactory);
+        IMetaDataPageManager freePageManager = new LinkedMetaDataPageManager(bufferCache, metaFrameFactory);
         RTree rtree = new RTree(bufferCache, fileMapProvider, freePageManager, interiorFrameFactory, leafFrameFactory,
                 cmpFactories, typeTraits.length, file);
         return rtree;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/1a659da1/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AsyncFIFOPageQueueManager.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AsyncFIFOPageQueueManager.java b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AsyncFIFOPageQueueManager.java
new file mode 100644
index 0000000..08f467e
--- /dev/null
+++ b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AsyncFIFOPageQueueManager.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.common.buffercache;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+
+public class AsyncFIFOPageQueueManager implements Runnable {
+    private final static boolean DEBUG = false;
+
+    protected LinkedBlockingQueue<ICachedPage> queue = new LinkedBlockingQueue<ICachedPage>();
+    volatile Thread writerThread;
+    protected AtomicBoolean poisoned = new AtomicBoolean(false);
+    protected BufferCache bufferCache;
+    volatile protected PageQueue pageQueue;
+
+    public AsyncFIFOPageQueueManager(BufferCache bufferCache){
+        this.bufferCache = bufferCache;
+    }
+
+    protected class PageQueue implements IFIFOPageQueue {
+        final IBufferCache bufferCache;
+        public final IFIFOPageWriter writer;
+
+        protected PageQueue(IBufferCache bufferCache, IFIFOPageWriter writer) {
+            if(DEBUG) System.out.println("[FIFO] New Queue");
+            this.bufferCache = bufferCache;
+            this.writer = writer;
+        }
+
+        protected IBufferCache getBufferCache() {
+            return bufferCache;
+        }
+
+        protected IFIFOPageWriter getWriter() {
+            return writer;
+        }
+
+        @Override
+        public void put(ICachedPage page) throws HyracksDataException {
+            try {
+                if(!poisoned.get()) {
+                    queue.put(page);
+                }
+                else{
+                    throw new HyracksDataException("Queue is closing");
+                }
+            } catch (InterruptedException e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+            }
+        }
+    }
+
+
+    public PageQueue createQueue(IFIFOPageWriter writer) {
+        if (pageQueue == null) {
+            synchronized(this){
+                if (pageQueue == null) {
+                    writerThread = new Thread(this);
+                    writerThread.setName("FIFO Writer Thread");
+                    writerThread.start();
+                    pageQueue = new PageQueue(bufferCache,writer);
+                }
+            }
+        }
+        return pageQueue;
+    }
+
+    public void destroyQueue(){
+        poisoned.set(true);
+        //Dummy cached page to act as poison pill
+        CachedPage poisonPill = new CachedPage();
+        poisonPill.setQueueInfo(new QueueInfo(true,true));
+        if(writerThread == null){
+            synchronized (this){
+                if(writerThread == null) {
+                    return;
+                }
+            }
+        }
+
+        try{
+            synchronized(poisonPill){
+                queue.put(poisonPill);
+                while(queue.contains(poisonPill)){
+                    poisonPill.wait();
+                }
+            }
+        } catch (InterruptedException e){
+            e.printStackTrace();
+        }
+    }
+
+    public void finishQueue() {
+        if(DEBUG) System.out.println("[FIFO] Finishing Queue");
+        try {
+            //Dummy cached page to act as low water mark
+            CachedPage lowWater = new CachedPage();
+            lowWater.setQueueInfo(new QueueInfo(true,false));
+            synchronized(lowWater){
+                queue.put(lowWater);
+                while(queue.contains(lowWater)){
+                        lowWater.wait();
+                }
+            }
+        } catch (InterruptedException e) {
+            // TODO what do we do here?
+            e.printStackTrace();
+        }
+        if(DEBUG) System.out.println("[FIFO] Queue finished");
+    }
+
+    @Override
+    public void run() {
+        if(DEBUG) System.out.println("[FIFO] Writer started");
+        boolean die = false;
+        while (!die) {
+            try {
+                ICachedPage entry = queue.take();
+                if(entry.getQueueInfo() != null && entry.getQueueInfo().hasWaiters()){
+                    synchronized(entry) {
+                        if(entry.getQueueInfo().isPoison()) { die = true; }
+                        entry.notifyAll();
+                        continue;
+                    }
+                }
+
+                if(DEBUG) System.out.println("[FIFO] Write " + BufferedFileHandle.getFileId(((CachedPage)entry).dpid)+","
+                        + BufferedFileHandle.getPageId(((CachedPage)entry).dpid));
+
+                try {
+                    pageQueue.getWriter().write(entry, bufferCache);
+                } catch (HyracksDataException e) {
+                    //TODO: What do we do, if we could not write the page?
+                    e.printStackTrace();
+                }
+            } catch(InterruptedException e) {
+                continue;
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/1a659da1/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index 3892e0a..3a23c8d 100644
--- a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -26,6 +26,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
@@ -49,10 +50,11 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
 
     private static final int MIN_CLEANED_COUNT_DIFF = 3;
     private static final int PIN_MAX_WAIT_TIME = 50;
+    public static final boolean DEBUG = false;
 
     private final int pageSize;
     private final int maxOpenFiles;
-    private final IIOManager ioManager;
+    final IIOManager ioManager;
     private final CacheBucket[] pageMap;
     private final IPageReplacementStrategy pageReplacementStrategy;
     private final IPageCleanerPolicy pageCleanerPolicy;
@@ -60,8 +62,16 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
     private final CleanerThread cleanerThread;
     private final Map<Integer, BufferedFileHandle> fileInfoMap;
     private final Set<Integer> virtualFiles;
+    private final AsyncFIFOPageQueueManager fifoWriter;
+    //DEBUG
+    private ArrayList<CachedPage> confiscatedPages;
+    private Lock confiscateLock;
+    private HashMap<CachedPage, StackTraceElement[]> confiscatedPagesOwner;
+    private ConcurrentHashMap<CachedPage, StackTraceElement[]> pinnedPageOwner;
+    //!DEBUG
     private IIOReplicationManager ioReplicationManager;
-    private List<ICachedPageInternal> cachedPages = new ArrayList<ICachedPageInternal>();
+    public List<ICachedPageInternal> cachedPages = new ArrayList<ICachedPageInternal>();
+
     private boolean closed;
 
     public BufferCache(IIOManager ioManager, IPageReplacementStrategy pageReplacementStrategy,
@@ -85,6 +95,14 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
         cleanerThread = new CleanerThread();
         executor.execute(cleanerThread);
         closed = false;
+
+        fifoWriter = new AsyncFIFOPageQueueManager(this);
+        if( DEBUG ) {
+            confiscatedPages = new ArrayList<CachedPage>();
+            confiscatedPagesOwner = new HashMap<CachedPage, StackTraceElement[]>();
+            confiscateLock = new ReentrantLock();
+            pinnedPageOwner = new ConcurrentHashMap<>();
+        }
     }
 
     //this constructor is used when replication is enabled to pass the IIOReplicationManager
@@ -126,8 +144,11 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
 
     @Override
     public ICachedPage tryPin(long dpid) throws HyracksDataException {
-        // Calling the pinSanityCheck should be used only for debugging, since the synchronized block over the fileInfoMap is a hot spot.
-        //pinSanityCheck(dpid);
+        // Calling the pinSanityCheck should be used only for debugging, since
+        // the synchronized block over the fileInfoMap is a hot spot.
+        if (DEBUG) {
+            pinSanityCheck(dpid);
+        }
         CachedPage cPage = null;
         int hash = hash(dpid);
         CacheBucket bucket = pageMap[hash];
@@ -150,10 +171,27 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
 
     @Override
     public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException {
-        // Calling the pinSanityCheck should be used only for debugging, since the synchronized block over the fileInfoMap is a hot spot.
-        //pinSanityCheck(dpid);
+        // Calling the pinSanityCheck should be used only for debugging, since
+        // the synchronized block over the fileInfoMap is a hot spot.
+        if (DEBUG) {
+            pinSanityCheck(dpid);
+        }
         CachedPage cPage = findPage(dpid, false);
         if (!newPage) {
+            if (DEBUG) {
+                confiscateLock.lock();
+                try {
+                    for (CachedPage c : confiscatedPages) {
+                        if (c.dpid == dpid && c.confiscated.get()) {
+                            while(confiscatedPages.contains(c)){
+                                throw new IllegalStateException();
+                            }
+                        }
+                    }
+                }finally{
+                    confiscateLock.unlock();
+                }
+            }
             // Resolve race of multiple threads trying to read the page from
             // disk.
             synchronized (cPage) {
@@ -166,40 +204,17 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
             cPage.valid = true;
         }
         pageReplacementStrategy.notifyCachePageAccess(cPage);
+        if(DEBUG){
+            pinnedPageOwner.put((CachedPage) cPage, Thread.currentThread().getStackTrace());
+        }
         return cPage;
     }
 
-    @Override
-    /**
-     * Allocate and pin a virtual page. This is just like a normal page, except that it will never be flushed.
-     */
-    public ICachedPage pinVirtual(long vpid) throws HyracksDataException {
-        //pinSanityCheck(vpid);
-        CachedPage cPage = findPage(vpid, true);
-        cPage.virtual = true;
-        return cPage;
-    }
 
-    @Override
-    /**
-     * Takes a virtual page, and copies it to a new page at the physical identifier.
-     */
-    //TODO: I should not have to copy the page. I should just append it to the end of the hash bucket, but this is
-    //safer/easier for now.
-    public ICachedPage unpinVirtual(long vpid, long dpid) throws HyracksDataException {
-        CachedPage virtPage = findPage(vpid, true); //should definitely succeed.
-        //pinSanityCheck(dpid); //debug
-        ICachedPage realPage = pin(dpid, false);
-        virtPage.acquireReadLatch();
-        realPage.acquireWriteLatch();
-        try {
-            System.arraycopy(virtPage.buffer.array(), 0, realPage.getBuffer().array(), 0, virtPage.buffer.capacity());
-        } finally {
-            realPage.releaseWriteLatch(true);
-            virtPage.releaseReadLatch();
-        }
-        virtPage.reset(-1); //now cause the virtual page to die
-        return realPage;
+
+    private boolean isVirtual(long vpid) throws HyracksDataException {
+        CachedPage virtPage = findPage(vpid, true);
+        return virtPage.confiscated.get();
     }
 
     private CachedPage findPage(long dpid, boolean virtual) throws HyracksDataException {
@@ -217,7 +232,13 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
             try {
                 cPage = bucket.cachedPage;
                 while (cPage != null) {
+                    if(DEBUG) {
+                        assert bucket.cachedPage != bucket.cachedPage.next;
+                    }
                     if (cPage.dpid == dpid) {
+                        if(DEBUG) {
+                            assert !cPage.confiscated.get();
+                        }
                         cPage.pinCount.incrementAndGet();
                         return cPage;
                     }
@@ -237,28 +258,29 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
                  * on the CachedPage may or may not be valid. 2. We have a pin
                  * on the CachedPage. We have to deal with three cases here.
                  * Case 1: The dpid on the CachedPage is invalid (-1). This
-                 * indicates that this buffer has never been used or is a virtual page. So we are the
-                 * only ones holding it. Get a lock on the required dpid's hash
-                 * bucket, check if someone inserted the page we want into the
-                 * table. If so, decrement the pincount on the victim and return
-                 * the winner page in the table. If such a winner does not
-                 * exist, insert the victim and return it. Case 2: The dpid on
-                 * the CachedPage is valid. Case 2a: The current dpid and
-                 * required dpid hash to the same bucket. Get the bucket lock,
-                 * check that the victim is still at pinCount == 1 If so check
-                 * if there is a winning CachedPage with the required dpid. If
-                 * so, decrement the pinCount on the victim and return the
-                 * winner. If not, update the contents of the CachedPage to hold
-                 * the required dpid and return it. If the picCount on the
-                 * victim was != 1 or CachedPage was dirty someone used the
-                 * victim for its old contents -- Decrement the pinCount and
-                 * retry. Case 2b: The current dpid and required dpid hash to
-                 * different buckets. Get the two bucket locks in the order of
-                 * the bucket indexes (Ordering prevents deadlocks). Check for
-                 * the existence of a winner in the new bucket and for potential
-                 * use of the victim (pinCount != 1). If everything looks good,
-                 * remove the CachedPage from the old bucket, and add it to the
-                 * new bucket and update its header with the new dpid.
+                 * indicates that this buffer has never been used or is a
+                 * confiscated page. So we are the only ones holding it. Get a lock
+                 * on the required dpid's hash bucket, check if someone inserted
+                 * the page we want into the table. If so, decrement the
+                 * pincount on the victim and return the winner page in the
+                 * table. If such a winner does not exist, insert the victim and
+                 * return it. Case 2: The dpid on the CachedPage is valid. Case
+                 * 2a: The current dpid and required dpid hash to the same
+                 * bucket. Get the bucket lock, check that the victim is still
+                 * at pinCount == 1 If so check if there is a winning CachedPage
+                 * with the required dpid. If so, decrement the pinCount on the
+                 * victim and return the winner. If not, update the contents of
+                 * the CachedPage to hold the required dpid and return it. If
+                 * the picCount on the victim was != 1 or CachedPage was dirty
+                 * someone used the victim for its old contents -- Decrement the
+                 * pinCount and retry. Case 2b: The current dpid and required
+                 * dpid hash to different buckets. Get the two bucket locks in
+                 * the order of the bucket indexes (Ordering prevents
+                 * deadlocks). Check for the existence of a winner in the new
+                 * bucket and for potential use of the victim (pinCount != 1).
+                 * If everything looks good, remove the CachedPage from the old
+                 * bucket, and add it to the new bucket and update its header
+                 * with the new dpid.
                  */
                 if (victim.dpid < 0) {
                     /*
@@ -266,11 +288,24 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
                      */
                     bucket.bucketLock.lock();
                     try {
+                        if (DEBUG) {
+                            confiscateLock.lock();
+                            try{
+                                if (confiscatedPages.contains(victim)) {
+                                    throw new IllegalStateException();
+                                }
+                            } finally{
+                                confiscateLock.unlock();
+                            }
+                        }
                         cPage = bucket.cachedPage;
                         while (cPage != null) {
                             if (cPage.dpid == dpid) {
                                 cPage.pinCount.incrementAndGet();
                                 victim.pinCount.decrementAndGet();
+                                if(DEBUG) {
+                                    assert !cPage.confiscated.get();
+                                }
                                 return cPage;
                             }
                             cPage = cPage.next;
@@ -281,6 +316,10 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
                     } finally {
                         bucket.bucketLock.unlock();
                     }
+
+                    if(DEBUG) {
+                        assert !victim.confiscated.get();
+                    }
                     return victim;
                 }
                 int victimHash = hash(victim.dpid);
@@ -294,11 +333,24 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
                             victim.pinCount.decrementAndGet();
                             continue;
                         }
+                        if (DEBUG) {
+                            confiscateLock.lock();
+                            try{
+                                if (confiscatedPages.contains(victim)) {
+                                    throw new IllegalStateException();
+                                }
+                            }finally{
+                                confiscateLock.unlock();
+                            }
+                        }
                         cPage = bucket.cachedPage;
                         while (cPage != null) {
                             if (cPage.dpid == dpid) {
                                 cPage.pinCount.incrementAndGet();
                                 victim.pinCount.decrementAndGet();
+                                if(DEBUG) {
+                                    assert !victim.confiscated.get();
+                                }
                                 return cPage;
                             }
                             cPage = cPage.next;
@@ -307,6 +359,9 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
                     } finally {
                         bucket.bucketLock.unlock();
                     }
+                    if(DEBUG) {
+                        assert !victim.confiscated.get();
+                    }
                     return victim;
                 } else {
                     /*
@@ -325,11 +380,19 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
                             victim.pinCount.decrementAndGet();
                             continue;
                         }
+                        if (DEBUG) {
+                            if (confiscatedPages.contains(victim)) {
+                                throw new IllegalStateException();
+                            }
+                        }
                         cPage = bucket.cachedPage;
                         while (cPage != null) {
                             if (cPage.dpid == dpid) {
                                 cPage.pinCount.incrementAndGet();
                                 victim.pinCount.decrementAndGet();
+                                if(DEBUG) {
+                                    assert !cPage.confiscated.get();
+                                }
                                 return cPage;
                             }
                             cPage = cPage.next;
@@ -341,7 +404,9 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
                             while (victimPrev != null && victimPrev.next != victim) {
                                 victimPrev = victimPrev.next;
                             }
-                            assert victimPrev != null;
+                            if(DEBUG) {
+                                assert victimPrev != null;
+                            }
                             victimPrev.next = victim.next;
                         }
                         victim.reset(dpid);
@@ -351,6 +416,9 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
                         victimBucket.bucketLock.unlock();
                         bucket.bucketLock.unlock();
                     }
+                    if(DEBUG) {
+                        assert !victim.confiscated.get();
+                    }
                     return victim;
                 }
             }
@@ -382,6 +450,7 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
                 .append('\n');
         buffer.append("Hash table size: ").append(pageMap.length).append('\n');
         buffer.append("Page Map:\n");
+        buffer.append("cpid -> [fileId:pageId, pinCount, valid/invalid, confiscated/physical, dirty/clean]");
         int nCachedPages = 0;
         for (int i = 0; i < pageMap.length; ++i) {
             CacheBucket cb = pageMap[i];
@@ -395,6 +464,7 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
                                 .append(BufferedFileHandle.getFileId(cp.dpid)).append(':')
                                 .append(BufferedFileHandle.getPageId(cp.dpid)).append(", ").append(cp.pinCount.get())
                                 .append(", ").append(cp.valid ? "valid" : "invalid").append(", ")
+                                .append(cp.confiscated.get() ? "confiscated" : "physical").append(", ")
                                 .append(cp.dirty.get() ? "dirty" : "clean").append("]\n");
                         cp = cp.next;
                         ++nCachedPages;
@@ -405,6 +475,15 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
             }
         }
         buffer.append("Number of cached pages: ").append(nCachedPages).append('\n');
+        if(DEBUG){
+            confiscateLock.lock();
+            try{
+                buffer.append("Number of confiscated pages: ").append(confiscatedPages.size()).append('\n');
+            }
+            finally{
+                confiscateLock.unlock();
+            }
+        }
         return buffer.toString();
     }
 
@@ -415,9 +494,13 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
                 cPage.buffer);
     }
 
-    private BufferedFileHandle getFileInfo(CachedPage cPage) throws HyracksDataException {
+    BufferedFileHandle getFileInfo(CachedPage cPage) throws HyracksDataException {
+        return getFileInfo(BufferedFileHandle.getFileId(cPage.dpid));
+    }
+
+    BufferedFileHandle getFileInfo(int fileId) throws HyracksDataException {
         synchronized (fileInfoMap) {
-            BufferedFileHandle fInfo = fileInfoMap.get(BufferedFileHandle.getFileId(cPage.dpid));
+            BufferedFileHandle fInfo = fileInfoMap.get(fileId);
             if (fInfo == null) {
                 throw new HyracksDataException("No such file mapped");
             }
@@ -443,6 +526,9 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
         if (closed) {
             throw new HyracksDataException("unpin called on a closed cache");
         }
+        if(DEBUG){
+            pinnedPageOwner.remove(page);
+        }
         ((CachedPage) page).pinCount.decrementAndGet();
     }
 
@@ -462,7 +548,9 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
 
     @Override
     public ICachedPageInternal getPage(int cpid) {
-        return cachedPages.get(cpid);
+        synchronized (cachedPages) {
+            return cachedPages.get(cpid);
+        }
     }
 
     private class CleanerThread extends Thread {
@@ -482,7 +570,7 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
         }
 
         public void cleanPage(CachedPage cPage, boolean force) {
-            if (cPage.dirty.get() && !cPage.virtual) {
+            if (cPage.dirty.get() && !cPage.confiscated.get()) {
                 boolean proceed = false;
                 if (force) {
                     cPage.latch.writeLock().lock();
@@ -529,10 +617,15 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
             try {
                 while (true) {
                     pageCleanerPolicy.notifyCleanCycleStart(this);
-                    int numPages = pageReplacementStrategy.getNumPages();
-                    for (int i = 0; i < numPages; ++i) {
-                        CachedPage cPage = (CachedPage) cachedPages.get(i);
-                        cleanPage(cPage, false);
+                    int curPage = 0;
+                    while (true) {
+                        synchronized (cachedPages) {
+                            if (curPage >= pageReplacementStrategy.getNumPages()) {
+                                break;
+                            }
+                            cleanPage((CachedPage) cachedPages.get(curPage), false);
+                        }
+                        curPage++;
                     }
                     if (shutdownStart) {
                         break;
@@ -551,6 +644,7 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
     @Override
     public void close() {
         closed = true;
+        fifoWriter.destroyQueue();
         synchronized (cleanerThread) {
             cleanerThread.shutdownStart = true;
             cleanerThread.notifyAll();
@@ -698,7 +792,10 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
                 pinCount = cPage.pinCount.get();
             }
             if (pinCount > 0) {
-                throw new IllegalStateException("Page is pinned and file is being closed. Pincount is: " + pinCount);
+                throw new IllegalStateException("Page " + BufferedFileHandle.getFileId(cPage.dpid) + ":"
+                        + BufferedFileHandle.getPageId(cPage.dpid)
+                        + " is pinned and file is being closed. Pincount is: " + pinCount + " Page is confiscated: "
+                        + cPage.confiscated);
             }
             cPage.invalidate();
             return true;
@@ -749,12 +846,8 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Deleting file: " + fileId + " in cache: " + this);
         }
-        if (flushDirtyPages) {
-            synchronized (fileInfoMap) {
-                sweepAndFlush(fileId, flushDirtyPages);
-            }
-        }
         synchronized (fileInfoMap) {
+            sweepAndFlush(fileId, flushDirtyPages);
             BufferedFileHandle fInfo = null;
             try {
                 fInfo = fileInfoMap.get(fileId);
@@ -819,7 +912,9 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
 
     @Override
     public void addPage(ICachedPageInternal page) {
-        cachedPages.add(page);
+        synchronized (cachedPages) {
+            cachedPages.add(page);
+        }
     }
 
     @Override
@@ -828,6 +923,214 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
     }
 
     @Override
+    public int getNumPagesOfFile(int fileId) throws HyracksDataException {
+        synchronized (fileInfoMap) {
+            BufferedFileHandle fInfo = fileInfoMap.get(fileId);
+            if (fInfo == null) {
+                throw new HyracksDataException("No such file mapped for fileId:" + fileId);
+            }
+            if(DEBUG) {
+                assert ioManager.getSize(fInfo.getFileHandle()) % getPageSize() == 0;
+            }
+            return (int) (ioManager.getSize(fInfo.getFileHandle()) / getPageSize());
+        }
+    }
+
+    @Override
+    public void adviseWontNeed(ICachedPage page) {
+        pageReplacementStrategy.adviseWontNeed((ICachedPageInternal) page);
+    }
+
+    @Override
+    public ICachedPage confiscatePage(long dpid) throws HyracksDataException {
+        while (true) {
+            int startCleanedCount = cleanerThread.cleanedCount;
+            ICachedPage returnPage = null;
+            CachedPage victim = (CachedPage) pageReplacementStrategy.findVictim();
+            if (victim != null) {
+                if(DEBUG) {
+                    assert !victim.confiscated.get();
+                }
+                // find a page that would possibly be evicted anyway
+                // Case 1 from findPage()
+                if (victim.dpid < 0) { // new page
+                    if (victim.pinCount.get() != 1) {
+                        victim.pinCount.decrementAndGet();
+                        continue;
+                    }
+                    returnPage = victim;
+                    ((CachedPage) returnPage).dpid = dpid;
+                } else {
+                    // Case 2a/b
+                    int pageHash = hash(victim.getDiskPageId());
+                    CacheBucket bucket = pageMap[pageHash];
+                    bucket.bucketLock.lock();
+                    try {
+                        // readjust the next pointers to remove this page from
+                        // the pagemap
+                        CachedPage curr = bucket.cachedPage;
+                        CachedPage prev = null;
+                        boolean found = false;
+                        //traverse the bucket's linked list to find the victim.
+                        while (curr != null) {
+                            if (curr == victim) { // we found where the victim
+                                                  // resides in the hash table
+                                if (victim.pinCount.get() != 1) {
+                                    victim.pinCount.decrementAndGet();
+                                    break;
+                                }
+                                // if this is the first page in the bucket
+                                if (prev == null) {
+                                    if(DEBUG) {
+                                        assert curr != curr.next;
+                                    }
+                                    bucket.cachedPage = bucket.cachedPage.next;
+                                    found = true;
+                                    break;
+                                    // if it isn't we need to make the previous
+                                    // node point to where it should
+                                } else {
+                                    if(DEBUG) {
+                                        assert curr.next != curr;
+                                    }
+                                    prev.next = curr.next;
+                                    curr.next = null;
+                                    if(DEBUG) {
+                                        assert prev.next != prev;
+                                    }
+                                    found = true;
+                                    break;
+                                }
+                            }
+                            // go to the next entry
+                            prev = curr;
+                            curr = curr.next;
+                        }
+                        if (found) {
+                            returnPage = victim;
+                            ((CachedPage) returnPage).dpid = dpid;
+                        } //otherwise, someone took the same victim before we acquired the lock. try again!
+                    } finally {
+                        bucket.bucketLock.unlock();
+                    }
+                }
+            }
+            // if we found a page after all that, go ahead and finish
+            if (returnPage != null) {
+                ((CachedPage) returnPage).confiscated.set(true);
+                if (DEBUG) {
+                    confiscateLock.lock();
+                    try{
+                        confiscatedPages.add((CachedPage) returnPage);
+                        confiscatedPagesOwner.put((CachedPage) returnPage, Thread.currentThread().getStackTrace());
+                    }
+                    finally{
+                        confiscateLock.unlock();
+                    }
+                }
+                return returnPage;
+            }
+            // no page available to confiscate. try kicking the cleaner thread.
+            synchronized (cleanerThread) {
+                pageCleanerPolicy.notifyVictimNotFound(cleanerThread);
+            }
+            // Heuristic optimization. Check whether the cleaner thread has
+            // cleaned pages since we did our last pin attempt.
+            if (cleanerThread.cleanedCount - startCleanedCount > MIN_CLEANED_COUNT_DIFF) {
+                // Don't go to sleep and wait for notification from the cleaner,
+                // just try to pin again immediately.
+                continue;
+            }
+            synchronized (cleanerThread.cleanNotification) {
+                try {
+                    cleanerThread.cleanNotification.wait(PIN_MAX_WAIT_TIME);
+                } catch (InterruptedException e) {
+                    // Do nothing
+                }
+            }
+        }
+    }
+
+    @Override
+    public void returnPage(ICachedPage page) {
+        returnPage(page, true);
+    }
+
+    @Override
+    public void returnPage(ICachedPage page, boolean reinsert) {
+        CachedPage cPage = (CachedPage) page;
+        CacheBucket bucket = null;
+        if(!page.confiscated()){
+            return;
+        }
+        if (reinsert) {
+            int hash = hash(cPage.dpid);
+            bucket = pageMap[hash];
+            bucket.bucketLock.lock();
+            if(DEBUG) {
+                confiscateLock.lock();
+            }
+            try {
+                cPage.reset(cPage.dpid);
+                cPage.valid = true;
+                cPage.next = bucket.cachedPage;
+                bucket.cachedPage = cPage;
+                cPage.pinCount.decrementAndGet();
+                if(DEBUG){
+                    assert cPage.pinCount.get() == 0 ;
+                    assert cPage.latch.getReadLockCount() == 0;
+                    assert cPage.latch.getWriteHoldCount() == 0;
+                    confiscatedPages.remove(cPage);
+                    confiscatedPagesOwner.remove(cPage);
+                }
+            } finally {
+                bucket.bucketLock.unlock();
+                if(DEBUG) {
+                    confiscateLock.unlock();
+                }
+            }
+        } else {
+            cPage.invalidate();
+            cPage.pinCount.decrementAndGet();
+            if(DEBUG){
+                assert cPage.pinCount.get() == 0;
+                assert cPage.latch.getReadLockCount() == 0;
+                assert cPage.latch.getWriteHoldCount() == 0;
+                confiscateLock.lock();
+                try{
+                    confiscatedPages.remove(cPage);
+                    confiscatedPagesOwner.remove(cPage);
+                } finally{
+                    confiscateLock.unlock();
+                }
+            }
+        }
+        pageReplacementStrategy.adviseWontNeed(cPage);
+    }
+
+    @Override
+    public void setPageDiskId(ICachedPage page, long dpid) {
+        ((CachedPage) page).dpid = dpid;
+    }
+
+    @Override
+    public IFIFOPageQueue createFIFOQueue() {
+        return fifoWriter.createQueue(FIFOLocalWriter.instance());
+    }
+
+    @Override
+    public void finishQueue() {
+        fifoWriter.finishQueue();
+    }
+
+    @Override
+    public void copyPage(ICachedPage src, ICachedPage dst) {
+        CachedPage srcCast = (CachedPage) src;
+        CachedPage dstCast = (CachedPage) dst;
+        System.arraycopy(srcCast.buffer.array(), 0, dstCast.getBuffer().array(), 0, srcCast.buffer.capacity());
+    }
+
+    @Override
     public boolean isReplicationEnabled() {
         if (ioReplicationManager != null) {
             return ioReplicationManager.isReplicationEnabled();
@@ -839,4 +1142,21 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
     public IIOReplicationManager getIOReplicationManager() {
         return ioReplicationManager;
     }
+
+    @Override
+    /**
+     * _ONLY_ call this if you absolutely, positively know this file has no dirty pages in the cache!
+     * Bypasses the normal lifecycle of a file handle and evicts all references to it immediately.
+     */
+    public void purgeHandle(int fileId) throws HyracksDataException{
+        synchronized(fileInfoMap){
+                BufferedFileHandle fh = fileInfoMap.get(fileId);
+                if(fh != null){
+                    ioManager.close(fh.getFileHandle());
+                    fileInfoMap.remove(fileId);
+                    fileMapManager.unregisterFile(fileId);
+                }
+        }
+    }
+
 }


Mime
View raw message