asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ima...@apache.org
Subject [48/50] [abbrv] incubator-asterixdb git commit: Merge remote-tracking branch 'hyracks-local/master' into hyracks-merge2
Date Thu, 07 Apr 2016 15:00:23 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e928b6ac/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java
----------------------------------------------------------------------
diff --cc hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java
index 8e39223,0000000..e4be66b
mode 100644,000000..100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java
@@@ -1,368 -1,0 +1,379 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.hyracks.storage.am.lsm.common.impls;
 +
 +import java.util.List;
 +import java.util.logging.Level;
 +import java.util.logging.Logger;
 +
 +import org.apache.hyracks.api.exceptions.HyracksDataException;
 +import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
 +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 +import org.apache.hyracks.storage.am.common.api.IIndexCursor;
 +import org.apache.hyracks.storage.am.common.api.ISearchPredicate;
 +import org.apache.hyracks.storage.am.common.api.IndexException;
 +import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexInternal;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 +import org.apache.hyracks.storage.am.lsm.common.api.ITwoPCIndex;
 +import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 +
 +public class ExternalIndexHarness extends LSMHarness {
 +    private static final Logger LOGGER = Logger.getLogger(ExternalIndexHarness.class.getName());
 +
-     public ExternalIndexHarness(ILSMIndexInternal lsmIndex, ILSMMergePolicy mergePolicy,
-             ILSMOperationTracker opTracker, boolean replicationEnabled) {
++    public ExternalIndexHarness(ILSMIndexInternal lsmIndex, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker,
++            boolean replicationEnabled) {
 +        super(lsmIndex, mergePolicy, opTracker, replicationEnabled);
 +    }
 +
 +    @Override
 +    protected boolean getAndEnterComponents(ILSMIndexOperationContext ctx, LSMOperationType opType,
 +            boolean isTryOperation) throws HyracksDataException {
++        validateOperationEnterComponentsState(ctx);
 +        synchronized (opTracker) {
 +            while (true) {
 +                lsmIndex.getOperationalComponents(ctx);
 +                // Before entering the components, prune those corner cases that indeed should not proceed.
 +                switch (opType) {
 +                    case MERGE:
 +                        if (ctx.getComponentHolder().size() < 2) {
 +                            // There is only a single component. There is nothing to merge.
 +                            return false;
 +                        }
 +                    default:
 +                        break;
 +                }
 +                if (enterComponents(ctx, opType)) {
 +                    return true;
 +                } else if (isTryOperation) {
 +                    return false;
 +                }
 +            }
 +        }
 +    }
 +
 +    @Override
 +    protected boolean enterComponents(ILSMIndexOperationContext ctx, LSMOperationType opType)
 +            throws HyracksDataException {
++        validateOperationEnterComponentsState(ctx);
 +        List<ILSMComponent> components = ctx.getComponentHolder();
 +        int numEntered = 0;
 +        boolean entranceSuccessful = false;
 +        try {
 +            for (ILSMComponent c : components) {
 +                if (!c.threadEnter(opType, false)) {
 +                    break;
 +                }
 +                numEntered++;
 +            }
 +            entranceSuccessful = numEntered == components.size();
 +        } finally {
 +            if (!entranceSuccessful) {
 +                for (ILSMComponent c : components) {
 +                    if (numEntered == 0) {
 +                        break;
 +                    }
 +                    c.threadExit(opType, true, false);
 +                    numEntered--;
 +                }
 +                return false;
 +            }
++            ctx.setAccessingComponents(true);
 +        }
 +        // Check if there is any action that is needed to be taken based on the operation type
 +        switch (opType) {
 +            case MERGE:
 +                lsmIndex.getIOOperationCallback().beforeOperation(LSMOperationType.MERGE);
 +            default:
 +                break;
 +        }
 +        opTracker.beforeOperation(lsmIndex, opType, ctx.getSearchOperationCallback(), ctx.getModificationCallback());
 +        return true;
 +    }
 +
 +    private void exitComponents(ILSMIndexOperationContext ctx, LSMOperationType opType, ILSMComponent newComponent,
 +            boolean failedOperation) throws HyracksDataException, IndexException {
++        /**
++         * FLUSH and MERGE operations should always exit the components
++         * to notify waiting threads.
++         */
++        if (!ctx.isAccessingComponents() && opType != LSMOperationType.FLUSH && opType != LSMOperationType.MERGE) {
++            return;
++        }
 +        synchronized (opTracker) {
 +            try {
 +                // First check if there is any action that is needed to be taken based on the state of each component.
 +                for (ILSMComponent c : ctx.getComponentHolder()) {
 +                    c.threadExit(opType, failedOperation, false);
 +                    switch (c.getState()) {
 +                        case INACTIVE:
 +                            if (replicationEnabled) {
 +                                componentsToBeReplicated.clear();
 +                                componentsToBeReplicated.add(c);
 +                                lsmIndex.scheduleReplication(null, componentsToBeReplicated, false,
 +                                        ReplicationOperation.DELETE, opType);
 +                            }
 +                            ((AbstractDiskLSMComponent) c).destroy();
 +                            break;
 +                        default:
 +                            break;
 +                    }
 +                }
++                ctx.setAccessingComponents(false);
 +                // Then, perform any action that is needed to be taken based on the operation type.
 +                switch (opType) {
 +                    case MERGE:
 +                        // newComponent is null if the merge op. was not performed.
 +                        if (newComponent != null) {
 +                            beforeSubsumeMergedComponents(newComponent, ctx.getComponentHolder());
 +                            lsmIndex.subsumeMergedComponents(newComponent, ctx.getComponentHolder());
 +                            if (replicationEnabled) {
 +                                componentsToBeReplicated.clear();
 +                                componentsToBeReplicated.add(newComponent);
 +                                triggerReplication(componentsToBeReplicated, false, opType);
 +                            }
 +                            mergePolicy.diskComponentAdded(lsmIndex, fullMergeIsRequested.get());
 +                        }
 +                        break;
 +                    default:
 +                        break;
 +                }
 +            } finally {
 +                opTracker.afterOperation(lsmIndex, opType, ctx.getSearchOperationCallback(),
 +                        ctx.getModificationCallback());
 +            }
 +        }
 +    }
 +
 +    @Override
-     public void forceModify(ILSMIndexOperationContext ctx, ITupleReference tuple) throws HyracksDataException,
-             IndexException {
++    public void forceModify(ILSMIndexOperationContext ctx, ITupleReference tuple)
++            throws HyracksDataException, IndexException {
 +        throw new IndexException("2PC LSM Inedx doesn't support modify");
 +    }
 +
 +    @Override
 +    public boolean modify(ILSMIndexOperationContext ctx, boolean tryOperation, ITupleReference tuple)
 +            throws HyracksDataException, IndexException {
 +        throw new IndexException("2PC LSM Inedx doesn't support modify");
 +    }
 +
 +    @Override
 +    public void search(ILSMIndexOperationContext ctx, IIndexCursor cursor, ISearchPredicate pred)
 +            throws HyracksDataException, IndexException {
 +        LSMOperationType opType = LSMOperationType.SEARCH;
 +        getAndEnterComponents(ctx, opType, false);
 +        try {
 +            lsmIndex.search(ctx, cursor, pred);
 +        } catch (HyracksDataException | IndexException e) {
 +            exitComponents(ctx, opType, null, true);
 +            throw e;
 +        }
 +    }
 +
 +    @Override
 +    public void endSearch(ILSMIndexOperationContext ctx) throws HyracksDataException {
 +        if (ctx.getOperation() == IndexOperation.SEARCH) {
 +            try {
 +                exitComponents(ctx, LSMOperationType.SEARCH, null, false);
 +            } catch (IndexException e) {
 +                throw new HyracksDataException(e);
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
 +            throws HyracksDataException, IndexException {
 +        if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) {
 +            callback.afterFinalize(LSMOperationType.MERGE, null);
 +            return;
 +        }
 +        lsmIndex.scheduleMerge(ctx, callback);
 +    }
 +
 +    @Override
 +    public void scheduleFullMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
 +            throws HyracksDataException, IndexException {
 +        fullMergeIsRequested.set(true);
 +        if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) {
 +            // If the merge cannot be scheduled because there is already an ongoing merge on subset/all of the components, then
 +            // whenever the current merge has finished, it will schedule the full merge again.
 +            callback.afterFinalize(LSMOperationType.MERGE, null);
 +            return;
 +        }
 +        fullMergeIsRequested.set(false);
 +        lsmIndex.scheduleMerge(ctx, callback);
 +    }
 +
 +    @Override
-     public void merge(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
-             IndexException {
++    public void merge(ILSMIndexOperationContext ctx, ILSMIOOperation operation)
++            throws HyracksDataException, IndexException {
 +        if (LOGGER.isLoggable(Level.INFO)) {
 +            LOGGER.info("Started a merge operation for index: " + lsmIndex + " ...");
 +        }
 +
 +        ILSMComponent newComponent = null;
 +        try {
 +            newComponent = lsmIndex.merge(operation);
 +            operation.getCallback().afterOperation(LSMOperationType.MERGE, ctx.getComponentHolder(), newComponent);
 +            lsmIndex.markAsValid(newComponent);
 +        } finally {
 +            exitComponents(ctx, LSMOperationType.MERGE, newComponent, false);
 +            operation.getCallback().afterFinalize(LSMOperationType.MERGE, newComponent);
 +        }
 +        if (LOGGER.isLoggable(Level.INFO)) {
 +            LOGGER.info("Finished the merge operation for index: " + lsmIndex);
 +        }
 +    }
 +
 +    @Override
 +    public void addBulkLoadedComponent(ILSMComponent c) throws HyracksDataException, IndexException {
 +        lsmIndex.markAsValid(c);
 +        synchronized (opTracker) {
 +            lsmIndex.addComponent(c);
 +            if (replicationEnabled) {
 +                componentsToBeReplicated.clear();
 +                componentsToBeReplicated.add(c);
 +                triggerReplication(componentsToBeReplicated, true, LSMOperationType.MERGE);
 +            }
 +            // Enter the component
 +            enterComponent(c);
 +            mergePolicy.diskComponentAdded(lsmIndex, false);
 +        }
 +    }
 +
 +    // Three differences from  addBulkLoadedComponent
 +    // 1. this needs synchronization since others might be accessing the index (specifically merge operations that might change the lists of components)
 +    // 2. the actions taken by the index itself are different
 +    // 3. the component has already been marked valid by the bulk update operation
 +    public void addTransactionComponents(ILSMComponent newComponent) throws HyracksDataException, IndexException {
 +        ITwoPCIndex index = (ITwoPCIndex) lsmIndex;
 +        synchronized (opTracker) {
 +            List<ILSMComponent> newerList;
 +            List<ILSMComponent> olderList;
 +            if (index.getCurrentVersion() == 0) {
 +                newerList = index.getFirstComponentList();
 +                olderList = index.getSecondComponentList();
 +            } else {
 +                newerList = index.getSecondComponentList();
 +                olderList = index.getFirstComponentList();
 +            }
 +            // Exit components in old version of the index so they are ready to be
 +            // deleted if they are not needed anymore
 +            for (ILSMComponent c : olderList) {
 +                exitComponent(c);
 +            }
 +            // Enter components in the newer list
 +            for (ILSMComponent c : newerList) {
 +                enterComponent(c);
 +            }
 +            if (newComponent != null) {
 +                // Enter new component
 +                enterComponent(newComponent);
 +            }
 +            index.commitTransactionDiskComponent(newComponent);
 +            mergePolicy.diskComponentAdded(lsmIndex, fullMergeIsRequested.get());
 +        }
 +    }
 +
 +    @Override
 +    public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
 +            throws HyracksDataException {
 +        callback.afterFinalize(LSMOperationType.FLUSH, null);
 +    }
 +
 +    @Override
-     public void flush(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
-             IndexException {
++    public void flush(ILSMIndexOperationContext ctx, ILSMIOOperation operation)
++            throws HyracksDataException, IndexException {
 +    }
 +
 +    @Override
 +    public ILSMOperationTracker getOperationTracker() {
 +        return opTracker;
 +    }
 +
 +    public void beforeSubsumeMergedComponents(ILSMComponent newComponent, List<ILSMComponent> mergedComponents)
 +            throws HyracksDataException {
 +        ITwoPCIndex index = (ITwoPCIndex) lsmIndex;
 +        // check if merge will affect the first list
 +        if (index.getFirstComponentList().containsAll(mergedComponents)) {
 +            // exit un-needed components
 +            for (ILSMComponent c : mergedComponents) {
 +                exitComponent(c);
 +            }
 +            // enter new component
 +            enterComponent(newComponent);
 +        }
 +        // check if merge will affect the second list
 +        if (index.getSecondComponentList().containsAll(mergedComponents)) {
 +            // exit un-needed components
 +            for (ILSMComponent c : mergedComponents) {
 +                exitComponent(c);
 +            }
 +            // enter new component
 +            enterComponent(newComponent);
 +        }
 +    }
 +
 +    // The two methods: enterComponent and exitComponent are used to control
 +    // when components are to be deleted from disk
 +    private void enterComponent(ILSMComponent diskComponent) throws HyracksDataException {
 +        diskComponent.threadEnter(LSMOperationType.SEARCH, false);
 +    }
 +
 +    private void exitComponent(ILSMComponent diskComponent) throws HyracksDataException {
 +        diskComponent.threadExit(LSMOperationType.SEARCH, false, false);
 +        if (diskComponent.getState() == ILSMComponent.ComponentState.INACTIVE) {
 +            if (replicationEnabled) {
 +                componentsToBeReplicated.clear();
 +                componentsToBeReplicated.add(diskComponent);
 +                lsmIndex.scheduleReplication(null, componentsToBeReplicated, false, ReplicationOperation.DELETE, null);
 +            }
 +            ((AbstractDiskLSMComponent) diskComponent).destroy();
 +        }
 +    }
 +
 +    public void indexFirstTimeActivated() throws HyracksDataException {
 +        ITwoPCIndex index = (ITwoPCIndex) lsmIndex;
 +        // Enter disk components <-- To avoid deleting them when they are
 +        // still needed-->
 +        for (ILSMComponent c : index.getFirstComponentList()) {
 +            enterComponent(c);
 +        }
 +        for (ILSMComponent c : index.getSecondComponentList()) {
 +            enterComponent(c);
 +        }
 +    }
 +
 +    public void indexClear() throws HyracksDataException {
 +        ITwoPCIndex index = (ITwoPCIndex) lsmIndex;
 +        for (ILSMComponent c : index.getFirstComponentList()) {
 +            exitComponent(c);
 +        }
 +        for (ILSMComponent c : index.getSecondComponentList()) {
 +            exitComponent(c);
 +        }
 +    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e928b6ac/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
----------------------------------------------------------------------
diff --cc hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 21b0d8a,0000000..a19532f
mode 100644,000000..100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@@ -1,508 -1,0 +1,526 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.hyracks.storage.am.lsm.common.impls;
 +
 +import java.util.ArrayList;
 +import java.util.LinkedList;
 +import java.util.List;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.logging.Level;
 +import java.util.logging.Logger;
 +
 +import org.apache.hyracks.api.exceptions.HyracksDataException;
 +import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
 +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 +import org.apache.hyracks.storage.am.common.api.IIndexCursor;
 +import org.apache.hyracks.storage.am.common.api.ISearchPredicate;
 +import org.apache.hyracks.storage.am.common.api.IndexException;
 +import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 +import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.LSMComponentType;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexInternal;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 +import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 +
 +public class LSMHarness implements ILSMHarness {
 +    private static final Logger LOGGER = Logger.getLogger(LSMHarness.class.getName());
 +
 +    protected final ILSMIndexInternal lsmIndex;
 +    protected final ILSMMergePolicy mergePolicy;
 +    protected final ILSMOperationTracker opTracker;
 +    protected final AtomicBoolean fullMergeIsRequested;
 +    protected final boolean replicationEnabled;
 +    protected List<ILSMComponent> componentsToBeReplicated;
 +
 +    public LSMHarness(ILSMIndexInternal lsmIndex, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker,
 +            boolean replicationEnabled) {
 +        this.lsmIndex = lsmIndex;
 +        this.opTracker = opTracker;
 +        this.mergePolicy = mergePolicy;
 +        fullMergeIsRequested = new AtomicBoolean();
-         this.replicationEnabled = replicationEnabled;
++        //only durable indexes are replicated
++        this.replicationEnabled = replicationEnabled && lsmIndex.isDurable();
 +        if (replicationEnabled) {
 +            this.componentsToBeReplicated = new ArrayList<ILSMComponent>();
 +        }
 +    }
 +
 +    protected boolean getAndEnterComponents(ILSMIndexOperationContext ctx, LSMOperationType opType,
 +            boolean isTryOperation) throws HyracksDataException {
++        validateOperationEnterComponentsState(ctx);
 +        synchronized (opTracker) {
 +            while (true) {
 +                lsmIndex.getOperationalComponents(ctx);
 +                // Before entering the components, prune those corner cases that indeed should not proceed.
 +                switch (opType) {
 +                    case FLUSH:
 +                        ILSMComponent flushingComponent = ctx.getComponentHolder().get(0);
 +                        if (!((AbstractMemoryLSMComponent) flushingComponent).isModified()) {
 +                            //The mutable component has not been modified by any writer. There is nothing to flush.
 +                            //since the component is empty, set its state back to READABLE_WRITABLE
 +                            if (((AbstractLSMIndex) lsmIndex)
 +                                    .getCurrentMutableComponentState() == ComponentState.READABLE_UNWRITABLE) {
 +                                ((AbstractLSMIndex) lsmIndex)
 +                                        .setCurrentMutableComponentState(ComponentState.READABLE_WRITABLE);
 +                            }
 +                            return false;
 +                        }
 +                        if (((AbstractMemoryLSMComponent) flushingComponent).getWriterCount() > 0) {
 +                            /*
 +                             * This case is a case where even though FLUSH log was flushed to disk and scheduleFlush is triggered,
 +                             * the current in-memory component (whose state was changed to READABLE_WRITABLE (RW)
 +                             * from READABLE_UNWRITABLE(RU) before FLUSH log was written to log tail (which is memory buffer of log file)
 +                             * and then the state was changed back to RW (as shown in the following scenario)) can have writers
 +                             * based on the current code base/design.
 +                             * Thus, the writer count of the component may be greater than 0.
 +                             * if this happens, intead of throwing exception, scheduleFlush() deal with this situation by not flushing
 +                             * the component.
 +                             * Please see issue 884 for more detail information:
 +                             * https://code.google.com/p/asterixdb/issues/detail?id=884&q=owner%3Akisskys%40gmail.com&colspec=ID%20Type%20Status%20Priority%20Milestone%20Owner%20Summary%20ETA%20Severity
 +                             *
 +                             */
 +                            return false;
 +                        }
 +                        break;
 +                    case MERGE:
 +                        if (ctx.getComponentHolder().size() < 2) {
 +                            // There is only a single component. There is nothing to merge.
 +                            return false;
 +                        }
 +                    default:
 +                        break;
 +                }
 +                if (enterComponents(ctx, opType)) {
 +                    return true;
 +                } else if (isTryOperation) {
 +                    return false;
 +                }
 +                try {
 +                    // Flush and merge operations should never reach this wait call, because they are always try operations.
 +                    // If they fail to enter the components, then it means that there are an ongoing flush/merge operation on
 +                    // the same components, so they should not proceed.
 +                    opTracker.wait();
 +                } catch (InterruptedException e) {
 +                    throw new HyracksDataException(e);
 +                }
 +            }
 +        }
 +    }
 +
 +    protected boolean enterComponents(ILSMIndexOperationContext ctx, LSMOperationType opType)
 +            throws HyracksDataException {
++        validateOperationEnterComponentsState(ctx);
 +        List<ILSMComponent> components = ctx.getComponentHolder();
 +        int numEntered = 0;
 +        boolean entranceSuccessful = false;
 +        try {
 +            for (ILSMComponent c : components) {
 +                boolean isMutableComponent = numEntered == 0 && c.getType() == LSMComponentType.MEMORY ? true : false;
 +                if (!c.threadEnter(opType, isMutableComponent)) {
 +                    break;
 +                }
 +                numEntered++;
 +            }
 +            entranceSuccessful = numEntered == components.size();
 +        } catch (Throwable e) {
 +            e.printStackTrace();
 +            throw e;
 +        } finally {
 +            if (!entranceSuccessful) {
 +                int i = 0;
 +                for (ILSMComponent c : components) {
 +                    if (numEntered == 0) {
 +                        break;
 +                    }
 +                    boolean isMutableComponent = i == 0 && c.getType() == LSMComponentType.MEMORY ? true : false;
 +                    c.threadExit(opType, true, isMutableComponent);
 +                    i++;
 +                    numEntered--;
 +                }
 +                return false;
 +            }
++            ctx.setAccessingComponents(true);
 +        }
 +        // Check if there is any action that is needed to be taken based on the operation type
 +        switch (opType) {
 +            case FLUSH:
 +                lsmIndex.getIOOperationCallback().beforeOperation(LSMOperationType.FLUSH);
 +                // Changing the flush status should *always* precede changing the mutable component.
 +                lsmIndex.changeFlushStatusForCurrentMutableCompoent(false);
 +                lsmIndex.changeMutableComponent();
 +                // Notify all waiting threads whenever a flush has been scheduled since they will check
 +                // again if they can grab and enter the mutable component.
 +                opTracker.notifyAll();
 +                break;
 +            case MERGE:
 +                lsmIndex.getIOOperationCallback().beforeOperation(LSMOperationType.MERGE);
 +            default:
 +                break;
 +        }
 +        opTracker.beforeOperation(lsmIndex, opType, ctx.getSearchOperationCallback(), ctx.getModificationCallback());
 +        return true;
 +    }
 +
 +    private void exitComponents(ILSMIndexOperationContext ctx, LSMOperationType opType, ILSMComponent newComponent,
 +            boolean failedOperation) throws HyracksDataException, IndexException {
++        /**
++         * FLUSH and MERGE operations should always exit the components
++         * to notify waiting threads.
++         */
++        if (!ctx.isAccessingComponents() && opType != LSMOperationType.FLUSH && opType != LSMOperationType.MERGE) {
++            return;
++        }
 +        List<ILSMComponent> inactiveDiskComponents = null;
 +        List<ILSMComponent> inactiveDiskComponentsToBeDeleted = null;
 +        try {
 +            synchronized (opTracker) {
 +                try {
 +                    int i = 0;
 +                    // First check if there is any action that is needed to be taken based on the state of each component.
 +                    for (ILSMComponent c : ctx.getComponentHolder()) {
 +                        boolean isMutableComponent = i == 0 && c.getType() == LSMComponentType.MEMORY ? true : false;
 +                        c.threadExit(opType, failedOperation, isMutableComponent);
 +                        if (c.getType() == LSMComponentType.MEMORY) {
 +                            switch (c.getState()) {
 +                                case READABLE_UNWRITABLE:
 +                                    if (isMutableComponent && (opType == LSMOperationType.MODIFICATION
 +                                            || opType == LSMOperationType.FORCE_MODIFICATION)) {
 +                                        lsmIndex.changeFlushStatusForCurrentMutableCompoent(true);
 +                                    }
 +                                    break;
 +                                case INACTIVE:
 +                                    ((AbstractMemoryLSMComponent) c).reset();
 +                                    // Notify all waiting threads whenever the mutable component's has change to inactive. This is important because
 +                                    // even though we switched the mutable components, it is possible that the component that we just switched
 +                                    // to is still busy flushing its data to disk. Thus, the notification that was issued upon scheduling the flush
 +                                    // is not enough.
 +                                    opTracker.notifyAll();
 +                                    break;
 +                                default:
 +                                    break;
 +                            }
 +                        } else {
 +                            switch (c.getState()) {
 +                                case INACTIVE:
 +                                    lsmIndex.addInactiveDiskComponent(c);
 +                                    break;
 +                                default:
 +                                    break;
 +                            }
 +                        }
 +                        i++;
 +                    }
++                    ctx.setAccessingComponents(false);
 +                    // Then, perform any action that is needed to be taken based on the operation type.
 +                    switch (opType) {
 +                        case FLUSH:
 +                            // newComponent is null if the flush op. was not performed.
 +                            if (newComponent != null) {
 +                                lsmIndex.addComponent(newComponent);
 +                                if (replicationEnabled) {
 +                                    componentsToBeReplicated.clear();
 +                                    componentsToBeReplicated.add(newComponent);
 +                                    triggerReplication(componentsToBeReplicated, false, opType);
 +                                }
 +                                mergePolicy.diskComponentAdded(lsmIndex, false);
 +                            }
 +                            break;
 +                        case MERGE:
 +                            // newComponent is null if the merge op. was not performed.
 +                            if (newComponent != null) {
 +                                lsmIndex.subsumeMergedComponents(newComponent, ctx.getComponentHolder());
 +                                if (replicationEnabled) {
 +                                    componentsToBeReplicated.clear();
 +                                    componentsToBeReplicated.add(newComponent);
 +                                    triggerReplication(componentsToBeReplicated, false, opType);
 +                                }
 +                                mergePolicy.diskComponentAdded(lsmIndex, fullMergeIsRequested.get());
 +                            }
 +                            break;
 +                        default:
 +                            break;
 +                    }
 +                } catch (Throwable e) {
 +                    e.printStackTrace();
 +                    throw e;
 +                } finally {
 +                    if (failedOperation && (opType == LSMOperationType.MODIFICATION
 +                            || opType == LSMOperationType.FORCE_MODIFICATION)) {
 +                        //When the operation failed, completeOperation() method must be called
 +                        //in order to decrement active operation count which was incremented in beforeOperation() method.
 +                        opTracker.completeOperation(lsmIndex, opType, ctx.getSearchOperationCallback(),
 +                                ctx.getModificationCallback());
 +                    } else {
 +                        opTracker.afterOperation(lsmIndex, opType, ctx.getSearchOperationCallback(),
 +                                ctx.getModificationCallback());
 +                    }
 +
 +                    /*
 +                     * = Inactive disk components lazy cleanup if any =
 +                     * Prepare to cleanup inactive diskComponents which were old merged components
 +                     * and not anymore accessed.
 +                     * This cleanup is done outside of optracker synchronized block.
 +                     */
 +                    inactiveDiskComponents = lsmIndex.getInactiveDiskComponents();
 +                    if (!inactiveDiskComponents.isEmpty()) {
 +                        for (ILSMComponent inactiveComp : inactiveDiskComponents) {
 +                            if (((AbstractDiskLSMComponent) inactiveComp).getFileReferenceCount() == 1) {
 +                                if (inactiveDiskComponentsToBeDeleted == null) {
 +                                    inactiveDiskComponentsToBeDeleted = new LinkedList<ILSMComponent>();
 +                                }
 +                                inactiveDiskComponentsToBeDeleted.add(inactiveComp);
 +                            }
 +                        }
 +                        if (inactiveDiskComponentsToBeDeleted != null) {
 +                            inactiveDiskComponents.removeAll(inactiveDiskComponentsToBeDeleted);
 +                        }
 +                    }
 +                }
 +            }
 +        } finally {
 +            /*
 +             * cleanup inactive disk components if any
 +             */
 +            if (inactiveDiskComponentsToBeDeleted != null) {
 +                try {
 +                    //schedule a replication job to delete these inactive disk components from replicas
 +                    if (replicationEnabled) {
 +                        lsmIndex.scheduleReplication(null, inactiveDiskComponentsToBeDeleted, false,
 +                                ReplicationOperation.DELETE, opType);
 +                    }
 +
 +                    for (ILSMComponent c : inactiveDiskComponentsToBeDeleted) {
 +                        ((AbstractDiskLSMComponent) c).destroy();
 +                    }
 +                } catch (Throwable e) {
 +                    e.printStackTrace();
 +                    throw e;
 +                }
 +            }
 +        }
 +
 +    }
 +
 +    @Override
 +    public void forceModify(ILSMIndexOperationContext ctx, ITupleReference tuple)
 +            throws HyracksDataException, IndexException {
 +        LSMOperationType opType = LSMOperationType.FORCE_MODIFICATION;
 +        modify(ctx, false, tuple, opType);
 +    }
 +
 +    @Override
 +    public boolean modify(ILSMIndexOperationContext ctx, boolean tryOperation, ITupleReference tuple)
 +            throws HyracksDataException, IndexException {
 +        LSMOperationType opType = LSMOperationType.MODIFICATION;
 +        return modify(ctx, tryOperation, tuple, opType);
 +    }
 +
 +    private boolean modify(ILSMIndexOperationContext ctx, boolean tryOperation, ITupleReference tuple,
 +            LSMOperationType opType) throws HyracksDataException, IndexException {
 +        if (!lsmIndex.isMemoryComponentsAllocated()) {
 +            lsmIndex.allocateMemoryComponents();
 +        }
 +        boolean failedOperation = false;
 +        if (!getAndEnterComponents(ctx, opType, tryOperation)) {
 +            return false;
 +        }
 +        try {
 +            lsmIndex.modify(ctx, tuple);
 +            // The mutable component is always in the first index.
 +            AbstractMemoryLSMComponent mutableComponent = (AbstractMemoryLSMComponent) ctx.getComponentHolder().get(0);
 +            mutableComponent.setIsModified();
 +        } catch (Exception e) {
 +            failedOperation = true;
 +            throw e;
 +        } finally {
 +            exitComponents(ctx, opType, null, failedOperation);
 +        }
 +        return true;
 +    }
 +
 +    @Override
 +    public void search(ILSMIndexOperationContext ctx, IIndexCursor cursor, ISearchPredicate pred)
 +            throws HyracksDataException, IndexException {
 +        LSMOperationType opType = LSMOperationType.SEARCH;
 +        ctx.setSearchPredicate(pred);
 +        getAndEnterComponents(ctx, opType, false);
 +        try {
 +            ctx.getSearchOperationCallback().before(pred.getLowKey());
 +            lsmIndex.search(ctx, cursor, pred);
 +        } catch (HyracksDataException | IndexException e) {
 +            exitComponents(ctx, opType, null, true);
 +            throw e;
 +        }
 +    }
 +
 +    @Override
 +    public void endSearch(ILSMIndexOperationContext ctx) throws HyracksDataException {
 +        if (ctx.getOperation() == IndexOperation.SEARCH) {
 +            try {
 +                exitComponents(ctx, LSMOperationType.SEARCH, null, false);
 +            } catch (IndexException e) {
 +                throw new HyracksDataException(e);
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
 +            throws HyracksDataException {
 +        if (!getAndEnterComponents(ctx, LSMOperationType.FLUSH, true)) {
 +            callback.afterFinalize(LSMOperationType.FLUSH, null);
 +            return;
 +        }
 +        lsmIndex.scheduleFlush(ctx, callback);
 +    }
 +
 +    @Override
 +    public void flush(ILSMIndexOperationContext ctx, ILSMIOOperation operation)
 +            throws HyracksDataException, IndexException {
 +        if (LOGGER.isLoggable(Level.INFO)) {
 +            LOGGER.info("Started a flush operation for index: " + lsmIndex + " ...");
 +        }
 +
 +        ILSMComponent newComponent = null;
 +        try {
 +            newComponent = lsmIndex.flush(operation);
 +            operation.getCallback().afterOperation(LSMOperationType.FLUSH, null, newComponent);
 +            lsmIndex.markAsValid(newComponent);
 +        } catch (Throwable e) {
 +            e.printStackTrace();
 +            throw e;
 +        } finally {
 +            exitComponents(ctx, LSMOperationType.FLUSH, newComponent, false);
 +            operation.getCallback().afterFinalize(LSMOperationType.FLUSH, newComponent);
 +        }
 +        if (LOGGER.isLoggable(Level.INFO)) {
 +            LOGGER.info("Finished the flush operation for index: " + lsmIndex);
 +        }
 +    }
 +
 +    @Override
 +    public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
 +            throws HyracksDataException, IndexException {
 +        if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) {
 +            callback.afterFinalize(LSMOperationType.MERGE, null);
 +            return;
 +        }
 +        lsmIndex.scheduleMerge(ctx, callback);
 +    }
 +
 +    @Override
 +    public void scheduleFullMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
 +            throws HyracksDataException, IndexException {
 +        fullMergeIsRequested.set(true);
 +        if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) {
 +            // If the merge cannot be scheduled because there is already an ongoing merge on subset/all of the components, then
 +            // whenever the current merge has finished, it will schedule the full merge again.
 +            callback.afterFinalize(LSMOperationType.MERGE, null);
 +            return;
 +        }
 +        fullMergeIsRequested.set(false);
 +        lsmIndex.scheduleMerge(ctx, callback);
 +    }
 +
 +    @Override
 +    public void merge(ILSMIndexOperationContext ctx, ILSMIOOperation operation)
 +            throws HyracksDataException, IndexException {
 +        if (LOGGER.isLoggable(Level.INFO)) {
 +            LOGGER.info("Started a merge operation for index: " + lsmIndex + " ...");
 +        }
 +
 +        ILSMComponent newComponent = null;
 +        try {
 +            newComponent = lsmIndex.merge(operation);
 +            operation.getCallback().afterOperation(LSMOperationType.MERGE, ctx.getComponentHolder(), newComponent);
 +            lsmIndex.markAsValid(newComponent);
 +        } catch (Throwable e) {
 +            e.printStackTrace();
 +            throw e;
 +        } finally {
 +            exitComponents(ctx, LSMOperationType.MERGE, newComponent, false);
 +            operation.getCallback().afterFinalize(LSMOperationType.MERGE, newComponent);
 +        }
 +        if (LOGGER.isLoggable(Level.INFO)) {
 +            LOGGER.info("Finished the merge operation for index: " + lsmIndex);
 +        }
 +    }
 +
 +    @Override
 +    public void addBulkLoadedComponent(ILSMComponent c) throws HyracksDataException, IndexException {
 +        lsmIndex.markAsValid(c);
 +        synchronized (opTracker) {
 +            lsmIndex.addComponent(c);
 +            if (replicationEnabled) {
 +                componentsToBeReplicated.clear();
 +                componentsToBeReplicated.add(c);
 +                triggerReplication(componentsToBeReplicated, true, LSMOperationType.MERGE);
 +            }
 +            mergePolicy.diskComponentAdded(lsmIndex, false);
 +        }
 +    }
 +
 +    @Override
 +    public ILSMOperationTracker getOperationTracker() {
 +        return opTracker;
 +    }
 +
 +    protected void triggerReplication(List<ILSMComponent> lsmComponents, boolean bulkload, LSMOperationType opType)
 +            throws HyracksDataException {
 +        ILSMIndexAccessorInternal accessor = lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE,
 +                NoOpOperationCallback.INSTANCE);
 +        accessor.scheduleReplication(lsmComponents, bulkload, opType);
 +    }
 +
 +    @Override
 +    public void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMComponent> lsmComponents, boolean bulkload,
 +            LSMOperationType opType) throws HyracksDataException {
 +
 +        //enter the LSM components to be replicated to prevent them from being deleted until they are replicated
 +        if (!getAndEnterComponents(ctx, LSMOperationType.REPLICATE, false)) {
 +            return;
 +        }
 +
 +        lsmIndex.scheduleReplication(ctx, lsmComponents, bulkload, ReplicationOperation.REPLICATE, opType);
 +    }
 +
 +    @Override
 +    public void endReplication(ILSMIndexOperationContext ctx) throws HyracksDataException {
 +        try {
 +            exitComponents(ctx, LSMOperationType.REPLICATE, null, false);
 +        } catch (IndexException e) {
 +            throw new HyracksDataException(e);
 +        }
 +    }
++
++    protected void validateOperationEnterComponentsState(ILSMIndexOperationContext ctx) throws HyracksDataException {
++        if (ctx.isAccessingComponents()) {
++            throw new HyracksDataException("Opeartion already has access to components of index " + lsmIndex);
++        }
++    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e928b6ac/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
----------------------------------------------------------------------
diff --cc hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
index c4d2fcc,0000000..befdd85
mode 100644,000000..100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
@@@ -1,306 -1,0 +1,308 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.hyracks.storage.am.lsm.common.impls;
 +
 +import java.util.Comparator;
 +import java.util.List;
 +import java.util.PriorityQueue;
 +
 +import org.apache.hyracks.api.exceptions.HyracksDataException;
 +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 +import org.apache.hyracks.storage.am.common.api.IIndexCursor;
 +import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor;
 +import org.apache.hyracks.storage.am.common.api.IndexException;
 +import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMTreeTupleReference;
 +import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 +import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 +
 +public abstract class LSMIndexSearchCursor implements ITreeIndexCursor {
 +    protected final ILSMIndexOperationContext opCtx;
 +    protected final boolean returnDeletedTuples;
 +    protected PriorityQueueElement outputElement;
 +    protected IIndexCursor[] rangeCursors;
 +    protected PriorityQueueElement[] pqes;
 +    protected PriorityQueue<PriorityQueueElement> outputPriorityQueue;
 +    protected PriorityQueueComparator pqCmp;
 +    protected MultiComparator cmp;
 +    protected boolean needPush;
 +    protected boolean includeMutableComponent;
 +    protected ILSMHarness lsmHarness;
 +
 +    protected List<ILSMComponent> operationalComponents;
 +
 +    public LSMIndexSearchCursor(ILSMIndexOperationContext opCtx, boolean returnDeletedTuples) {
 +        this.opCtx = opCtx;
 +        this.returnDeletedTuples = returnDeletedTuples;
 +        outputElement = null;
 +        needPush = false;
 +    }
 +
 +    public ILSMIndexOperationContext getOpCtx() {
 +        return opCtx;
 +    }
 +
 +    public void initPriorityQueue() throws HyracksDataException, IndexException {
 +        int pqInitSize = (rangeCursors.length > 0) ? rangeCursors.length : 1;
 +        if (outputPriorityQueue == null) {
 +            outputPriorityQueue = new PriorityQueue<PriorityQueueElement>(pqInitSize, pqCmp);
 +            pqes = new PriorityQueueElement[pqInitSize];
 +            for (int i = 0; i < pqInitSize; i++) {
 +                pqes[i] = new PriorityQueueElement(i);
 +            }
 +            for (int i = 0; i < rangeCursors.length; i++) {
 +                pushIntoPriorityQueue(pqes[i]);
 +            }
 +        } else {
 +            outputPriorityQueue.clear();
 +            // did size change?
 +            if (pqInitSize == pqes.length) {
 +                // size is the same -> re-use
 +                for (int i = 0; i < rangeCursors.length; i++) {
 +                    pqes[i].reset(null);
 +                    pushIntoPriorityQueue(pqes[i]);
 +                }
 +            } else {
 +                // size changed (due to flushes, merges, etc) -> re-create
 +                pqes = new PriorityQueueElement[pqInitSize];
 +                for (int i = 0; i < rangeCursors.length; i++) {
 +                    pqes[i] = new PriorityQueueElement(i);
 +                    pushIntoPriorityQueue(pqes[i]);
 +                }
 +            }
 +        }
 +    }
 +
 +    public IIndexCursor getCursor(int cursorIndex) {
 +        return rangeCursors[cursorIndex];
 +    }
 +
 +    @Override
 +    public void reset() throws HyracksDataException, IndexException {
 +        outputElement = null;
 +        needPush = false;
 +
 +        try {
 +            if (outputPriorityQueue != null) {
 +                outputPriorityQueue.clear();
 +            }
 +
 +            if (rangeCursors != null) {
 +                for (int i = 0; i < rangeCursors.length; i++) {
 +                    rangeCursors[i].reset();
 +                }
 +            }
 +            rangeCursors = null;
 +        } finally {
 +            if (lsmHarness != null) {
 +                lsmHarness.endSearch(opCtx);
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public boolean hasNext() throws HyracksDataException, IndexException {
 +        checkPriorityQueue();
 +        return !outputPriorityQueue.isEmpty();
 +    }
 +
 +    @Override
 +    public void next() throws HyracksDataException {
 +        outputElement = outputPriorityQueue.poll();
 +        needPush = true;
 +    }
 +
 +    @Override
 +    public ICachedPage getPage() {
 +        // do nothing
 +        return null;
 +    }
 +
 +    @Override
 +    public void close() throws HyracksDataException {
 +        try {
-             outputPriorityQueue.clear();
++            if (outputPriorityQueue != null) {
++                outputPriorityQueue.clear();
++            }
 +            for (int i = 0; i < rangeCursors.length; i++) {
 +                rangeCursors[i].close();
 +            }
 +            rangeCursors = null;
 +        } finally {
 +            if (lsmHarness != null) {
 +                lsmHarness.endSearch(opCtx);
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public void setBufferCache(IBufferCache bufferCache) {
 +        // do nothing
 +    }
 +
 +    @Override
 +    public void setFileId(int fileId) {
 +        // do nothing
 +    }
 +
 +    @Override
 +    public ITupleReference getTuple() {
 +        return outputElement.getTuple();
 +    }
 +
 +    protected boolean pushIntoPriorityQueue(PriorityQueueElement e) throws HyracksDataException, IndexException {
 +        int cursorIndex = e.getCursorIndex();
 +        if (rangeCursors[cursorIndex].hasNext()) {
 +            rangeCursors[cursorIndex].next();
 +            e.reset(rangeCursors[cursorIndex].getTuple());
 +            outputPriorityQueue.offer(e);
 +            return true;
 +        }
 +        rangeCursors[cursorIndex].close();
 +        return false;
 +    }
 +
 +    protected boolean isDeleted(PriorityQueueElement checkElement) throws HyracksDataException, IndexException {
 +        return ((ILSMTreeTupleReference) checkElement.getTuple()).isAntimatter();
 +    }
 +
 +    protected void checkPriorityQueue() throws HyracksDataException, IndexException {
 +        while (!outputPriorityQueue.isEmpty() || (needPush == true)) {
 +            if (!outputPriorityQueue.isEmpty()) {
 +                PriorityQueueElement checkElement = outputPriorityQueue.peek();
 +                // If there is no previous tuple or the previous tuple can be ignored
 +                if (outputElement == null) {
 +                    if (isDeleted(checkElement) && !returnDeletedTuples) {
 +                        // If the key has been deleted then pop it and set needPush to true.
 +                        // We cannot push immediately because the tuple may be
 +                        // modified if hasNext() is called
 +                        outputElement = outputPriorityQueue.poll();
 +                        needPush = true;
 +                    } else {
 +                        break;
 +                    }
 +                } else {
 +                    // Compare the previous tuple and the head tuple in the PQ
 +                    if (compare(cmp, outputElement.getTuple(), checkElement.getTuple()) == 0) {
 +                        // If the previous tuple and the head tuple are
 +                        // identical
 +                        // then pop the head tuple and push the next tuple from
 +                        // the tree of head tuple
 +
 +                        // the head element of PQ is useless now
 +                        PriorityQueueElement e = outputPriorityQueue.poll();
 +                        pushIntoPriorityQueue(e);
 +                    } else {
 +                        // If the previous tuple and the head tuple are different
 +                        // the info of previous tuple is useless
 +                        if (needPush == true) {
 +                            pushIntoPriorityQueue(outputElement);
 +                            needPush = false;
 +                        }
 +                        outputElement = null;
 +                    }
 +                }
 +            } else {
 +                // the priority queue is empty and needPush
 +                pushIntoPriorityQueue(outputElement);
 +                needPush = false;
 +                outputElement = null;
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public boolean exclusiveLatchNodes() {
 +        return false;
 +    }
 +
 +    public class PriorityQueueElement {
 +        private ITupleReference tuple;
 +        private int cursorIndex;
 +
 +        public PriorityQueueElement(int cursorIndex) {
 +            tuple = null;
 +            this.cursorIndex = cursorIndex;
 +        }
 +
 +        public ITupleReference getTuple() {
 +            return tuple;
 +        }
 +
 +        public int getCursorIndex() {
 +            return cursorIndex;
 +        }
 +
 +        public void reset(ITupleReference tuple) {
 +            this.tuple = tuple;
 +        }
 +    }
 +
 +    public class PriorityQueueComparator implements Comparator<PriorityQueueElement> {
 +
 +        protected MultiComparator cmp;
 +
 +        public PriorityQueueComparator(MultiComparator cmp) {
 +            this.cmp = cmp;
 +        }
 +
 +        @Override
 +        public int compare(PriorityQueueElement elementA, PriorityQueueElement elementB) {
 +            int result;
 +            try {
 +                result = cmp.compare(elementA.getTuple(), elementB.getTuple());
 +                if (result != 0) {
 +                    return result;
 +                }
 +            } catch (HyracksDataException e) {
 +                throw new IllegalArgumentException(e);
 +            }
 +
 +            if (elementA.getCursorIndex() > elementB.getCursorIndex()) {
 +                return 1;
 +            } else {
 +                return -1;
 +            }
 +        }
 +
 +        public MultiComparator getMultiComparator() {
 +            return cmp;
 +        }
 +    }
 +
 +    protected void setPriorityQueueComparator() {
 +        if (pqCmp == null || cmp != pqCmp.getMultiComparator()) {
 +            pqCmp = new PriorityQueueComparator(cmp);
 +        }
 +    }
 +
 +    protected int compare(MultiComparator cmp, ITupleReference tupleA, ITupleReference tupleB)
 +            throws HyracksDataException {
 +        return cmp.compare(tupleA, tupleB);
 +    }
 +
 +    @Override
 +    public void markCurrentTupleAsUpdated() throws HyracksDataException {
 +        throw new HyracksDataException("Updating tuples is not supported with this cursor.");
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e928b6ac/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
----------------------------------------------------------------------
diff --cc hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
index c511a67,0000000..828e296
mode 100644,000000..100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
@@@ -1,169 -1,0 +1,170 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.hyracks.storage.am.lsm.invertedindex.impls;
 +
 +import java.util.LinkedList;
 +import java.util.List;
 +
 +import org.apache.hyracks.api.exceptions.HyracksDataException;
 +import org.apache.hyracks.storage.am.common.api.IIndexAccessor;
 +import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
 +import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
 +import org.apache.hyracks.storage.am.common.api.ISearchPredicate;
 +import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 +import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 +import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator;
 +import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
- import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
++import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexOperationContext;
 +import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexAccessor;
 +
- public class LSMInvertedIndexOpContext implements ILSMIndexOperationContext {
++public class LSMInvertedIndexOpContext extends AbstractLSMIndexOperationContext {
 +
 +    private static final int NUM_DOCUMENT_FIELDS = 1;
 +
 +    private IndexOperation op;
 +    private final List<ILSMComponent> componentHolder;
 +    private final List<ILSMComponent> componentsToBeMerged;
 +    private final List<ILSMComponent> componentsToBeReplicated;
 +
 +    private IModificationOperationCallback modificationCallback;
 +    private ISearchOperationCallback searchCallback;
 +
 +    // Tuple that only has the inverted-index elements (aka keys), projecting away the document fields.
 +    public PermutingTupleReference keysOnlyTuple;
 +
 +    // Accessor to the in-memory inverted indexes.
 +    public IInvertedIndexAccessor[] mutableInvIndexAccessors;
 +    // Accessor to the deleted-keys BTrees.
 +    public IIndexAccessor[] deletedKeysBTreeAccessors;
 +
 +    public IInvertedIndexAccessor currentMutableInvIndexAccessors;
 +    public IIndexAccessor currentDeletedKeysBTreeAccessors;
 +
 +    public final PermutingTupleReference indexTuple;
 +    public final MultiComparator filterCmp;
 +    public final PermutingTupleReference filterTuple;
 +
 +    public ISearchPredicate searchPredicate;
 +
 +    public LSMInvertedIndexOpContext(List<ILSMComponent> mutableComponents,
 +            IModificationOperationCallback modificationCallback, ISearchOperationCallback searchCallback,
 +            int[] invertedIndexFields, int[] filterFields) throws HyracksDataException {
 +        this.componentHolder = new LinkedList<ILSMComponent>();
 +        this.componentsToBeMerged = new LinkedList<ILSMComponent>();
 +        this.componentsToBeReplicated = new LinkedList<ILSMComponent>();
 +        this.modificationCallback = modificationCallback;
 +        this.searchCallback = searchCallback;
 +
 +        mutableInvIndexAccessors = new IInvertedIndexAccessor[mutableComponents.size()];
 +        deletedKeysBTreeAccessors = new IIndexAccessor[mutableComponents.size()];
 +
 +        for (int i = 0; i < mutableComponents.size(); i++) {
 +            LSMInvertedIndexMemoryComponent mutableComponent = (LSMInvertedIndexMemoryComponent) mutableComponents
 +                    .get(i);
 +            mutableInvIndexAccessors[i] = (IInvertedIndexAccessor) mutableComponent.getInvIndex()
 +                    .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
 +            deletedKeysBTreeAccessors[i] = mutableComponent.getDeletedKeysBTree()
 +                    .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
 +        }
 +
 +        assert mutableComponents.size() > 0;
 +
 +        // Project away the document fields, leaving only the key fields.
 +        LSMInvertedIndexMemoryComponent c = (LSMInvertedIndexMemoryComponent) mutableComponents.get(0);
 +        int numKeyFields = c.getInvIndex().getInvListTypeTraits().length;
 +        int[] keyFieldPermutation = new int[numKeyFields];
 +        for (int i = 0; i < numKeyFields; i++) {
 +            keyFieldPermutation[i] = NUM_DOCUMENT_FIELDS + i;
 +        }
 +        keysOnlyTuple = new PermutingTupleReference(keyFieldPermutation);
 +
 +        if (filterFields != null) {
 +            indexTuple = new PermutingTupleReference(invertedIndexFields);
 +            filterCmp = MultiComparator.create(c.getLSMComponentFilter().getFilterCmpFactories());
 +            filterTuple = new PermutingTupleReference(filterFields);
 +        } else {
 +            indexTuple = null;
 +            filterCmp = null;
 +            filterTuple = null;
 +        }
 +    }
 +
 +    @Override
 +    public void reset() {
++        super.reset();
 +        componentHolder.clear();
 +        componentsToBeMerged.clear();
 +        componentsToBeReplicated.clear();
 +    }
 +
 +    @Override
 +    // TODO: Ignore opcallback for now.
 +    public void setOperation(IndexOperation newOp) throws HyracksDataException {
 +        reset();
 +        op = newOp;
 +    }
 +
 +    @Override
 +    public IndexOperation getOperation() {
 +        return op;
 +    }
 +
 +    @Override
 +    public List<ILSMComponent> getComponentHolder() {
 +        return componentHolder;
 +    }
 +
 +    @Override
 +    public ISearchOperationCallback getSearchOperationCallback() {
 +        return searchCallback;
 +    }
 +
 +    @Override
 +    public IModificationOperationCallback getModificationCallback() {
 +        return modificationCallback;
 +    }
 +
 +    @Override
 +    public void setCurrentMutableComponentId(int currentMutableComponentId) {
 +        currentMutableInvIndexAccessors = mutableInvIndexAccessors[currentMutableComponentId];
 +        currentDeletedKeysBTreeAccessors = deletedKeysBTreeAccessors[currentMutableComponentId];
 +    }
 +
 +    @Override
 +    public List<ILSMComponent> getComponentsToBeMerged() {
 +        return componentsToBeMerged;
 +    }
 +
 +    @Override
 +    public void setSearchPredicate(ISearchPredicate searchPredicate) {
 +        this.searchPredicate = searchPredicate;
 +    }
 +
 +    @Override
 +    public ISearchPredicate getSearchPredicate() {
 +        return searchPredicate;
 +    }
 +
 +    @Override
 +    public List<ILSMComponent> getComponentsToBeReplicated() {
 +        return componentsToBeReplicated;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e928b6ac/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTreeOpContext.java
----------------------------------------------------------------------
diff --cc hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTreeOpContext.java
index 6a9a640,0000000..358a42a
mode 100644,000000..100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTreeOpContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTreeOpContext.java
@@@ -1,134 -1,0 +1,135 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.hyracks.storage.am.lsm.rtree.impls;
 +
 +import java.util.LinkedList;
 +import java.util.List;
 +
 +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 +import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
 +import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
 +import org.apache.hyracks.storage.am.common.api.ISearchPredicate;
 +import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 +import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 +import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
- import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
++import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexOperationContext;
 +
- public class ExternalRTreeOpContext implements ILSMIndexOperationContext {
++public class ExternalRTreeOpContext extends AbstractLSMIndexOperationContext {
 +    private IndexOperation op;
 +    private MultiComparator bTreeCmp;
 +    private MultiComparator rTreeCmp;
 +    public final List<ILSMComponent> componentHolder;
 +    private final List<ILSMComponent> componentsToBeMerged;
 +    private final List<ILSMComponent> componentsToBeReplicated;
 +    public final ISearchOperationCallback searchCallback;
 +    private final int targetIndexVersion;
 +    public ISearchPredicate searchPredicate;
 +    public LSMRTreeCursorInitialState initialState;
 +
 +    public ExternalRTreeOpContext(IBinaryComparatorFactory[] rtreeCmpFactories,
 +            IBinaryComparatorFactory[] btreeCmpFactories, ISearchOperationCallback searchCallback,
 +            int targetIndexVersion, ILSMHarness lsmHarness, int[] comparatorFields,
 +            IBinaryComparatorFactory[] linearizerArray, ITreeIndexFrameFactory rtreeLeafFrameFactory,
 +            ITreeIndexFrameFactory rtreeInteriorFrameFactory, ITreeIndexFrameFactory btreeLeafFrameFactory) {
 +        this.componentHolder = new LinkedList<ILSMComponent>();
 +        this.componentsToBeMerged = new LinkedList<ILSMComponent>();
 +        this.componentsToBeReplicated = new LinkedList<ILSMComponent>();
 +        this.searchCallback = searchCallback;
 +        this.targetIndexVersion = targetIndexVersion;
 +        this.bTreeCmp = MultiComparator.create(btreeCmpFactories);
 +        this.rTreeCmp = MultiComparator.create(rtreeCmpFactories);
 +        initialState = new LSMRTreeCursorInitialState(rtreeLeafFrameFactory, rtreeInteriorFrameFactory,
 +                btreeLeafFrameFactory, bTreeCmp, lsmHarness, comparatorFields, linearizerArray, searchCallback,
 +                componentHolder);
 +    }
 +
 +    @Override
 +    public void setOperation(IndexOperation newOp) {
 +        reset();
 +        this.op = newOp;
 +    }
 +
 +    @Override
 +    public void setCurrentMutableComponentId(int currentMutableComponentId) {
 +        // Do nothing. this should never be called for disk only indexes
 +    }
 +
 +    @Override
 +    public void reset() {
++        super.reset();
 +        componentHolder.clear();
 +        componentsToBeMerged.clear();
 +        componentsToBeReplicated.clear();
 +    }
 +
 +    @Override
 +    public IndexOperation getOperation() {
 +        return op;
 +    }
 +
 +    public MultiComparator getBTreeMultiComparator() {
 +        return bTreeCmp;
 +    }
 +
 +    public MultiComparator getRTreeMultiComparator() {
 +        return rTreeCmp;
 +    }
 +
 +    @Override
 +    public List<ILSMComponent> getComponentHolder() {
 +        return componentHolder;
 +    }
 +
 +    @Override
 +    public ISearchOperationCallback getSearchOperationCallback() {
 +        return searchCallback;
 +    }
 +
 +    // This should never be needed for disk only indexes
 +    @Override
 +    public IModificationOperationCallback getModificationCallback() {
 +        return null;
 +    }
 +
 +    @Override
 +    public List<ILSMComponent> getComponentsToBeMerged() {
 +        return componentsToBeMerged;
 +    }
 +
 +    public int getTargetIndexVersion() {
 +        return targetIndexVersion;
 +    }
 +
 +    @Override
 +    public void setSearchPredicate(ISearchPredicate searchPredicate) {
 +        this.searchPredicate = searchPredicate;
 +    }
 +
 +    @Override
 +    public ISearchPredicate getSearchPredicate() {
 +        return searchPredicate;
 +    }
 +
 +    @Override
 +    public List<ILSMComponent> getComponentsToBeReplicated() {
 +        return componentsToBeReplicated;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e928b6ac/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
----------------------------------------------------------------------
diff --cc hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
index 686cd2b,0000000..62f572f
mode 100644,000000..100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
@@@ -1,181 -1,0 +1,182 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.hyracks.storage.am.lsm.rtree.impls;
 +
 +import java.util.LinkedList;
 +import java.util.List;
 +
 +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 +import org.apache.hyracks.storage.am.btree.impls.BTree;
 +import org.apache.hyracks.storage.am.btree.impls.BTreeOpContext;
 +import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
 +import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
 +import org.apache.hyracks.storage.am.common.api.ISearchPredicate;
 +import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 +import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 +import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 +import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator;
 +import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
- import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
++import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexOperationContext;
 +import org.apache.hyracks.storage.am.rtree.impls.RTree;
 +import org.apache.hyracks.storage.am.rtree.impls.RTreeOpContext;
 +
- public final class LSMRTreeOpContext implements ILSMIndexOperationContext {
++public final class LSMRTreeOpContext extends AbstractLSMIndexOperationContext {
 +
 +    public RTree.RTreeAccessor[] mutableRTreeAccessors;
 +    public RTree.RTreeAccessor currentMutableRTreeAccessor;
 +    public BTree.BTreeAccessor[] mutableBTreeAccessors;
 +    public BTree.BTreeAccessor currentMutableBTreeAccessor;
 +
 +    public RTreeOpContext[] rtreeOpContexts;
 +    public BTreeOpContext[] btreeOpContexts;
 +    public RTreeOpContext currentRTreeOpContext;
 +    public BTreeOpContext currentBTreeOpContext;
 +
 +    private IndexOperation op;
 +    public final List<ILSMComponent> componentHolder;
 +    private final List<ILSMComponent> componentsToBeMerged;
 +    private final List<ILSMComponent> componentsToBeReplicated;
 +    private IModificationOperationCallback modificationCallback;
 +    private ISearchOperationCallback searchCallback;
 +    public final PermutingTupleReference indexTuple;
 +    public final MultiComparator filterCmp;
 +    public final PermutingTupleReference filterTuple;
 +    public ISearchPredicate searchPredicate;
 +    public LSMRTreeCursorInitialState searchInitialState;
 +
 +    public LSMRTreeOpContext(List<ILSMComponent> mutableComponents, ITreeIndexFrameFactory rtreeLeafFrameFactory,
 +            ITreeIndexFrameFactory rtreeInteriorFrameFactory, ITreeIndexFrameFactory btreeLeafFrameFactory,
 +            ITreeIndexFrameFactory btreeInteriorFrameFactory, IBinaryComparatorFactory[] rtreeCmpFactories,
 +            IBinaryComparatorFactory[] btreeCmpFactories, IModificationOperationCallback modificationCallback,
 +            ISearchOperationCallback searchCallback, int[] rtreeFields, int[] filterFields, ILSMHarness lsmHarness,
 +            int[] comparatorFields, IBinaryComparatorFactory[] linearizerArray) {
 +        mutableRTreeAccessors = new RTree.RTreeAccessor[mutableComponents.size()];
 +        mutableBTreeAccessors = new BTree.BTreeAccessor[mutableComponents.size()];
 +        rtreeOpContexts = new RTreeOpContext[mutableComponents.size()];
 +        btreeOpContexts = new BTreeOpContext[mutableComponents.size()];
 +
 +        LSMRTreeMemoryComponent c = (LSMRTreeMemoryComponent) mutableComponents.get(0);
 +
 +        for (int i = 0; i < mutableComponents.size(); i++) {
 +            LSMRTreeMemoryComponent mutableComponent = (LSMRTreeMemoryComponent) mutableComponents.get(i);
 +            mutableRTreeAccessors[i] = (RTree.RTreeAccessor) mutableComponent.getRTree()
 +                    .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
 +            mutableBTreeAccessors[i] = (BTree.BTreeAccessor) mutableComponent.getBTree()
 +                    .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
 +
 +            rtreeOpContexts[i] = mutableRTreeAccessors[i].getOpContext();
 +            btreeOpContexts[i] = mutableBTreeAccessors[i].getOpContext();
 +        }
 +
 +        assert mutableComponents.size() > 0;
 +        currentRTreeOpContext = rtreeOpContexts[0];
 +        currentBTreeOpContext = btreeOpContexts[0];
 +        this.componentHolder = new LinkedList<ILSMComponent>();
 +        this.componentsToBeMerged = new LinkedList<ILSMComponent>();
 +        this.componentsToBeReplicated = new LinkedList<ILSMComponent>();
 +        this.modificationCallback = modificationCallback;
 +        this.searchCallback = searchCallback;
 +
 +        if (filterFields != null) {
 +            indexTuple = new PermutingTupleReference(rtreeFields);
 +            filterCmp = MultiComparator.create(c.getLSMComponentFilter().getFilterCmpFactories());
 +            filterTuple = new PermutingTupleReference(filterFields);
 +        } else {
 +            indexTuple = null;
 +            filterCmp = null;
 +            filterTuple = null;
 +        }
 +        searchInitialState = new LSMRTreeCursorInitialState(rtreeLeafFrameFactory, rtreeInteriorFrameFactory,
 +                btreeLeafFrameFactory, getBTreeMultiComparator(), lsmHarness, comparatorFields, linearizerArray,
 +                searchCallback, componentHolder);
 +    }
 +
 +    @Override
 +    public void setOperation(IndexOperation newOp) {
 +        reset();
 +        this.op = newOp;
 +    }
 +
 +    @Override
 +    public void setCurrentMutableComponentId(int currentMutableComponentId) {
 +        currentMutableRTreeAccessor = mutableRTreeAccessors[currentMutableComponentId];
 +        currentMutableBTreeAccessor = mutableBTreeAccessors[currentMutableComponentId];
 +        currentRTreeOpContext = rtreeOpContexts[currentMutableComponentId];
 +        currentBTreeOpContext = btreeOpContexts[currentMutableComponentId];
 +        if (op == IndexOperation.INSERT) {
 +            currentRTreeOpContext.setOperation(op);
 +        } else if (op == IndexOperation.DELETE) {
 +            currentBTreeOpContext.setOperation(IndexOperation.INSERT);
 +        }
 +    }
 +
 +    @Override
 +    public void reset() {
++        super.reset();
 +        componentHolder.clear();
 +        componentsToBeMerged.clear();
 +    }
 +
 +    @Override
 +    public IndexOperation getOperation() {
 +        return op;
 +    }
 +
 +    public MultiComparator getBTreeMultiComparator() {
 +        return currentBTreeOpContext.cmp;
 +    }
 +
 +    @Override
 +    public List<ILSMComponent> getComponentHolder() {
 +        return componentHolder;
 +    }
 +
 +    @Override
 +    public ISearchOperationCallback getSearchOperationCallback() {
 +        return searchCallback;
 +    }
 +
 +    @Override
 +    public IModificationOperationCallback getModificationCallback() {
 +        return modificationCallback;
 +    }
 +
 +    @Override
 +    public List<ILSMComponent> getComponentsToBeMerged() {
 +        return componentsToBeMerged;
 +    }
 +
 +    @Override
 +    public void setSearchPredicate(ISearchPredicate searchPredicate) {
 +        this.searchPredicate = searchPredicate;
 +    }
 +
 +    @Override
 +    public ISearchPredicate getSearchPredicate() {
 +        return searchPredicate;
 +    }
 +
 +    @Override
 +    public List<ILSMComponent> getComponentsToBeReplicated() {
 +        return componentsToBeReplicated;
 +    }
 +}


Mime
View raw message