asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From buyin...@apache.org
Subject [12/28] asterixdb git commit: Introduce IStorageComponentProvider
Date Thu, 02 Feb 2017 18:24:21 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
new file mode 100644
index 0000000..c6e0a6b
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.config.OptimizationConfUtil;
+import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.runtime.utils.RuntimeUtils;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+
+public class IndexUtil {
+
+    //TODO: replace this null with an empty array. currently, this breaks many tests
+    private static final int[] empty = null;
+    private static final PhysicalOptimizationConfig physicalOptimizationConfig =
+            OptimizationConfUtil.getPhysicalOptimizationConfig();
+
+    private IndexUtil() {
+    }
+
+    public static int[] getFilterFields(Dataset dataset, Index index, ITypeTraits[] filterTypeTraits)
+            throws AlgebricksException {
+        if (index.isPrimaryIndex()) {
+            return DatasetUtil.createFilterFields(dataset);
+        }
+        return secondaryFilterFields(dataset, index, filterTypeTraits);
+    }
+
+    public static int[] getBtreeFieldsIfFiltered(Dataset dataset, Index index) throws AlgebricksException {
+        if (index.isPrimaryIndex()) {
+            return DatasetUtil.createBTreeFieldsWhenThereisAFilter(dataset);
+        }
+        int numPrimaryKeys = DatasetUtil.getPartitioningKeys(dataset).size();
+        int numSecondaryKeys = index.getKeyFieldNames().size();
+        int[] btreeFields = new int[numSecondaryKeys + numPrimaryKeys];
+        for (int k = 0; k < btreeFields.length; k++) {
+            btreeFields[k] = k;
+        }
+        return btreeFields;
+    }
+
+    private static int[] secondaryFilterFields(Dataset dataset, Index index, ITypeTraits[] filterTypeTraits)
+            throws CompilationException {
+        if (filterTypeTraits == null) {
+            return empty;
+        }
+        int numPrimaryKeys = DatasetUtil.getPartitioningKeys(dataset).size();
+        int numSecondaryKeys = index.getKeyFieldNames().size();
+        switch (index.getIndexType()) {
+            case BTREE:
+                return new int[] { numPrimaryKeys + numSecondaryKeys };
+            case RTREE:
+            case LENGTH_PARTITIONED_NGRAM_INVIX:
+            case LENGTH_PARTITIONED_WORD_INVIX:
+            case SINGLE_PARTITION_NGRAM_INVIX:
+            case SINGLE_PARTITION_WORD_INVIX:
+                break;
+            default:
+                throw new CompilationException(ErrorCode.COMPILATION_UNKNOWN_INDEX_TYPE,
+                        index.getIndexType().toString());
+        }
+        return empty;
+    }
+
+    public static JobSpecification dropJob(Index index, MetadataProvider metadataProvider, Dataset dataset)
+            throws AlgebricksException {
+        JobSpecification spec = RuntimeUtils.createJobSpecification();
+        IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider();
+        boolean temp = dataset.getDatasetDetails().isTemp();
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
+                metadataProvider.getSplitProviderAndConstraints(index.getDataverseName(), index.getDatasetName(),
+                        index.getIndexName(), temp);
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+                DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
+        ARecordType recordType =
+                (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
+        ARecordType metaType = DatasetUtil.getMetaType(metadataProvider, dataset);
+        IIndexDataflowHelperFactory dataflowHelperFactory = dataset.getIndexDataflowHelperFactory(metadataProvider,
+                index, recordType, metaType, compactionInfo.first, compactionInfo.second);
+        IndexDropOperatorDescriptor btreeDrop =
+                new IndexDropOperatorDescriptor(spec, storageComponentProvider.getStorageManager(),
+                        storageComponentProvider.getIndexLifecycleManagerProvider(), splitsAndConstraint.first,
+                        dataflowHelperFactory, storageComponentProvider.getMetadataPageManagerFactory());
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeDrop,
+                splitsAndConstraint.second);
+        spec.addRoot(btreeDrop);
+        return spec;
+    }
+
+    public static JobSpecification buildSecondaryIndexCreationJobSpec(Dataset dataset, Index index,
+            ARecordType recType, ARecordType metaType, ARecordType enforcedType, ARecordType enforcedMetaType,
+            MetadataProvider metadataProvider) throws AlgebricksException {
+        SecondaryIndexOperationsHelper secondaryIndexHelper =
+                SecondaryIndexOperationsHelper.createIndexOperationsHelper(dataset, index, metadataProvider,
+                        physicalOptimizationConfig, recType, metaType, enforcedType, enforcedMetaType);
+        return secondaryIndexHelper.buildCreationJobSpec();
+    }
+
+    public static JobSpecification buildSecondaryIndexLoadingJobSpec(Dataset dataset, Index index, ARecordType recType,
+            ARecordType metaType, ARecordType enforcedType, ARecordType enforcedMetaType,
+            MetadataProvider metadataProvider) throws AlgebricksException {
+        SecondaryIndexOperationsHelper secondaryIndexHelper =
+                SecondaryIndexOperationsHelper.createIndexOperationsHelper(dataset, index, metadataProvider,
+                        physicalOptimizationConfig, recType, metaType, enforcedType, enforcedMetaType);
+        return secondaryIndexHelper.buildLoadingJobSpec();
+    }
+
+    public static JobSpecification buildSecondaryIndexLoadingJobSpec(Dataset dataset, Index index, ARecordType recType,
+            ARecordType metaType, ARecordType enforcedType, ARecordType enforcedMetaType,
+            MetadataProvider metadataProvider, List<ExternalFile> files) throws AlgebricksException {
+        SecondaryIndexOperationsHelper secondaryIndexHelper =
+                SecondaryIndexOperationsHelper.createIndexOperationsHelper(dataset, index, metadataProvider,
+                        physicalOptimizationConfig, recType, metaType, enforcedType, enforcedMetaType);
+        secondaryIndexHelper.setExternalFiles(files);
+        return secondaryIndexHelper.buildLoadingJobSpec();
+    }
+
+    public static JobSpecification buildDropSecondaryIndexJobSpec(Index index, MetadataProvider metadataProvider,
+            Dataset dataset) throws AlgebricksException {
+        JobSpecification spec = RuntimeUtils.createJobSpecification();
+        boolean temp = dataset.getDatasetDetails().isTemp();
+        IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider();
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
+                metadataProvider.getSplitProviderAndConstraints(index.getDataverseName(), index.getDatasetName(),
+                        index.getIndexName(), temp);
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+                DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
+        ARecordType recordType =
+                (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
+        ARecordType metaType = DatasetUtil.getMetaType(metadataProvider, dataset);
+        IIndexDataflowHelperFactory dataflowHelperFactory = dataset.getIndexDataflowHelperFactory(metadataProvider,
+                index, recordType, metaType, compactionInfo.first, compactionInfo.second);
+        // The index drop operation should be persistent regardless of temp datasets or permanent dataset.
+        IndexDropOperatorDescriptor btreeDrop =
+                new IndexDropOperatorDescriptor(spec, storageComponentProvider.getStorageManager(),
+                        storageComponentProvider.getIndexLifecycleManagerProvider(), splitsAndConstraint.first,
+                        dataflowHelperFactory, storageComponentProvider.getMetadataPageManagerFactory());
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeDrop,
+                splitsAndConstraint.second);
+        spec.addRoot(btreeDrop);
+
+        return spec;
+    }
+
+    public static JobSpecification buildSecondaryIndexCompactJobSpec(Dataset dataset, Index index, ARecordType recType,
+            ARecordType metaType, ARecordType enforcedType, ARecordType enforcedMetaType,
+            MetadataProvider metadataProvider) throws AlgebricksException {
+        SecondaryIndexOperationsHelper secondaryIndexHelper =
+                SecondaryIndexOperationsHelper.createIndexOperationsHelper(dataset, index, metadataProvider,
+                        physicalOptimizationConfig, recType, metaType, enforcedType, enforcedMetaType);
+        return secondaryIndexHelper.buildCompactJobSpec();
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/InvertedIndexDataflowHelperFactoryProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/InvertedIndexDataflowHelperFactoryProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/InvertedIndexDataflowHelperFactoryProvider.java
new file mode 100644
index 0000000..15aae94
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/InvertedIndexDataflowHelperFactoryProvider.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.config.DatasetConfig.IndexType;
+import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
+import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.metadata.api.IIndexDataflowHelperFactoryProvider;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.PartitionedLSMInvertedIndexDataflowHelperFactory;
+
+public class InvertedIndexDataflowHelperFactoryProvider implements IIndexDataflowHelperFactoryProvider {
+    public static final InvertedIndexDataflowHelperFactoryProvider INSTANCE =
+            new InvertedIndexDataflowHelperFactoryProvider();
+
+    private InvertedIndexDataflowHelperFactoryProvider() {
+    }
+
+    @Override
+    public IIndexDataflowHelperFactory getIndexDataflowHelperFactory(MetadataProvider mdProvider, Dataset dataset,
+            Index index, ARecordType recordType, ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory,
+            Map<String, String> mergePolicyProperties, ITypeTraits[] filterTypeTraits,
+            IBinaryComparatorFactory[] filterCmpFactories) throws AlgebricksException {
+        // Get basic info
+        List<List<String>> primaryKeys = DatasetUtil.getPartitioningKeys(dataset);
+        List<List<String>> secondaryKeys = index.getKeyFieldNames();
+        List<String> filterFieldName = DatasetUtil.getFilterField(dataset);
+        int numPrimaryKeys = primaryKeys.size();
+        int numSecondaryKeys = secondaryKeys.size();
+        // Validate
+        if (dataset.getDatasetType() != DatasetType.INTERNAL) {
+            throw new CompilationException(ErrorCode.COMPILATION_INDEX_TYPE_NOT_SUPPORTED_FOR_DATASET_TYPE,
+                    index.getIndexType().name(), dataset.getDatasetType());
+        }
+        if (numPrimaryKeys > 1) {
+            throw new AsterixException("Cannot create inverted index on dataset with composite primary key.");
+        }
+        if (numSecondaryKeys > 1) {
+            throw new AsterixException("Cannot create composite inverted index on multiple fields.");
+        }
+        boolean isPartitioned = index.getIndexType() == IndexType.LENGTH_PARTITIONED_WORD_INVIX
+                || index.getIndexType() == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX;
+        int numTokenKeyPairFields = (!isPartitioned) ? 1 + numPrimaryKeys : 2 + numPrimaryKeys;
+        int[] invertedIndexFields = null;
+        int[] secondaryFilterFieldsForNonBulkLoadOps = null;
+        int[] invertedIndexFieldsForNonBulkLoadOps = null;
+        int[] secondaryFilterFields = null;
+        if (filterFieldName != null) {
+            invertedIndexFields = new int[numTokenKeyPairFields];
+            for (int i = 0; i < invertedIndexFields.length; i++) {
+                invertedIndexFields[i] = i;
+            }
+            secondaryFilterFieldsForNonBulkLoadOps = new int[filterFieldName.size()];
+            secondaryFilterFieldsForNonBulkLoadOps[0] = numSecondaryKeys + numPrimaryKeys;
+            invertedIndexFieldsForNonBulkLoadOps = new int[numSecondaryKeys + numPrimaryKeys];
+            for (int i = 0; i < invertedIndexFieldsForNonBulkLoadOps.length; i++) {
+                invertedIndexFieldsForNonBulkLoadOps[i] = i;
+            }
+            secondaryFilterFields = new int[filterFieldName.size()];
+            secondaryFilterFields[0] = numTokenKeyPairFields - numPrimaryKeys + numPrimaryKeys;
+        }
+        IStorageComponentProvider storageComponentProvider = mdProvider.getStorageComponentProvider();
+        if (!isPartitioned) {
+            return new LSMInvertedIndexDataflowHelperFactory(
+                    new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), mergePolicyFactory,
+                    mergePolicyProperties, dataset.getIndexOperationTrackerFactory(index),
+                    storageComponentProvider.getIoOperationSchedulerProvider(),
+                    dataset.getIoOperationCallbackFactory(index),
+                    mdProvider.getStorageProperties().getBloomFilterFalsePositiveRate(), invertedIndexFields,
+                    filterTypeTraits, filterCmpFactories, secondaryFilterFields,
+                    secondaryFilterFieldsForNonBulkLoadOps, invertedIndexFieldsForNonBulkLoadOps,
+                    !dataset.getDatasetDetails().isTemp());
+        } else {
+            return new PartitionedLSMInvertedIndexDataflowHelperFactory(
+                    new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), mergePolicyFactory,
+                    mergePolicyProperties, dataset.getIndexOperationTrackerFactory(index),
+                    storageComponentProvider.getIoOperationSchedulerProvider(),
+                    dataset.getIoOperationCallbackFactory(index),
+                    mdProvider.getStorageProperties().getBloomFilterFalsePositiveRate(), invertedIndexFields,
+                    filterTypeTraits, filterCmpFactories, secondaryFilterFields,
+                    secondaryFilterFieldsForNonBulkLoadOps, invertedIndexFieldsForNonBulkLoadOps,
+                    !dataset.getDatasetDetails().isTemp());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/KeyFieldTypeUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/KeyFieldTypeUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/KeyFieldTypeUtil.java
new file mode 100644
index 0000000..e074241
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/KeyFieldTypeUtil.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.utils.NonTaggedFormatUtil;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+
+public class KeyFieldTypeUtil {
+
+    private KeyFieldTypeUtil() {
+    }
+
+    /**
+     * Get the types of primary key (partitioning key) fields
+     *
+     * @param dataset,
+     *            the dataset to consider.
+     * @param recordType,
+     *            the main record type.
+     * @param metaRecordType
+     *            the auxiliary meta record type.
+     * @return a list of IATypes, one for each corresponding primary key field.
+     * @throws AlgebricksException
+     */
+    public static List<IAType> getPartitoningKeyTypes(Dataset dataset, ARecordType recordType,
+            ARecordType metaRecordType) throws AlgebricksException {
+        if (dataset.getDatasetType() != DatasetType.INTERNAL) {
+            return null;
+        }
+        InternalDatasetDetails datasetDetails = (InternalDatasetDetails) dataset.getDatasetDetails();
+        return getPartitioningKeyTypes(datasetDetails, recordType, metaRecordType);
+    }
+
+    /**
+     * Get the types of primary key (partitioning key) fields
+     *
+     * @param datasetDetails,
+     *            contains specific data structures for an internal dataset.
+     * @param recordType,
+     *            the main record type.
+     * @param metaRecordType
+     *            the auxiliary meta record type.
+     * @return a list of IATypes, one for each corresponding primary key field.
+     * @throws AlgebricksException
+     */
+    public static List<IAType> getPartitioningKeyTypes(InternalDatasetDetails datasetDetails, ARecordType recordType,
+            ARecordType metaRecordType) throws AlgebricksException {
+        List<Integer> keySourceIndicators = datasetDetails.getKeySourceIndicator();
+        List<List<String>> partitioningKeys = datasetDetails.getPartitioningKey();
+        return getKeyTypes(recordType, metaRecordType, partitioningKeys, keySourceIndicators);
+    }
+
+    /**
+     * Get the types of key fields for an index, either primary or secondary.
+     *
+     * @param recordType,
+     *            the main record type.
+     * @param metaRecordType,
+     *            the auxiliary meta record type.
+     * @param keys,
+     *            the list of key fields.
+     * @param keySourceIndicators,
+     *            a list of integers to indicate that each key field is from the main record or the auxiliary meta
+     *            record.
+     * @return a list of IATypes, one for each corresponding index key field.
+     * @throws AlgebricksException
+     */
+    public static List<IAType> getKeyTypes(ARecordType recordType, ARecordType metaRecordType, List<List<String>> keys,
+            List<Integer> keySourceIndicators) throws AlgebricksException {
+        List<IAType> keyTypes = new ArrayList<>();
+        int index = 0;
+        for (List<String> partitioningKey : keys) {
+            keyTypes.add(chooseSource(keySourceIndicators, index, recordType, metaRecordType)
+                    .getSubFieldType(partitioningKey));
+            ++index;
+        }
+        return keyTypes;
+    }
+
+    /**
+     * Get the types of BTree index key fields
+     *
+     * @param index,
+     *            the index to consider.
+     * @param recordType,
+     *            the main record type.
+     * @param metaRecordType
+     *            the auxiliary meta record type.
+     * @return a list of IATypes, one for each corresponding index key field.
+     * @throws AlgebricksException
+     */
+    public static List<IAType> getBTreeIndexKeyTypes(Index index, ARecordType recordType, ARecordType metaRecordType)
+            throws AlgebricksException {
+        List<Integer> keySourceIndicators = index.getKeyFieldSourceIndicators();
+        List<IAType> indexKeyTypes = new ArrayList<>();
+        for (int i = 0; i < index.getKeyFieldNames().size(); i++) {
+            Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(i),
+                    index.getKeyFieldNames().get(i), chooseSource(keySourceIndicators, i, recordType, metaRecordType));
+            indexKeyTypes.add(keyPairType.first);
+        }
+        return indexKeyTypes;
+    }
+
+    /**
+     * Get the types of RTree index key fields
+     *
+     * @param index,
+     *            the index to consider.
+     * @param recordType,
+     *            the main record type.
+     * @param metaRecordType
+     *            the auxiliary meta record type.
+     * @return a list of IATypes, one for each corresponding index key field.
+     * @throws AlgebricksException
+     */
+    public static List<IAType> getRTreeIndexKeyTypes(Index index, ARecordType recordType, ARecordType metaRecordType)
+            throws AlgebricksException {
+        List<Integer> keySourceIndicators = index.getKeyFieldSourceIndicators();
+        List<IAType> indexKeyTypes = new ArrayList<>();
+        ARecordType targetRecType = chooseSource(keySourceIndicators, 0, recordType, metaRecordType);
+        Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(0),
+                index.getKeyFieldNames().get(0), targetRecType);
+        IAType keyType = keyPairType.first;
+        IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(keyType.getTypeTag());
+        int numKeys = KeyFieldTypeUtil.getNumSecondaryKeys(index, targetRecType, metaRecordType);
+        for (int i = 0; i < numKeys; i++) {
+            indexKeyTypes.add(nestedKeyType);
+        }
+        return indexKeyTypes;
+    }
+
+    /**
+     * Get the number of secondary index keys.
+     *
+     * @param index,
+     *            the index to consider.
+     * @param recordType,
+     *            the main record type.
+     * @param metaRecordType
+     *            the auxiliary meta record type.
+     * @return the number of secondary index keys.
+     * @throws AlgebricksException
+     */
+    public static int getNumSecondaryKeys(Index index, ARecordType recordType, ARecordType metaRecordType)
+            throws AlgebricksException {
+        List<Integer> keySourceIndicators = index.getKeyFieldSourceIndicators();
+        switch (index.getIndexType()) {
+            case BTREE:
+            case SINGLE_PARTITION_WORD_INVIX:
+            case SINGLE_PARTITION_NGRAM_INVIX:
+            case LENGTH_PARTITIONED_WORD_INVIX:
+            case LENGTH_PARTITIONED_NGRAM_INVIX:
+                return index.getKeyFieldNames().size();
+            case RTREE:
+                Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(0),
+                        index.getKeyFieldNames().get(0),
+                        chooseSource(keySourceIndicators, 0, recordType, metaRecordType));
+                IAType keyType = keyPairType.first;
+                return NonTaggedFormatUtil.getNumDimensions(keyType.getTypeTag()) * 2;
+            default:
+                throw new CompilationException(ErrorCode.COMPILATION_UNKNOWN_INDEX_TYPE, index.getIndexType());
+        }
+    }
+
+    /**
+     * Choose between the main record type and the auxiliary record type according to <code>keySourceIndicators</code>.
+     *
+     * @param keySourceIndicators,
+     *            a list of integers, 0 means to choose <code>recordType</code> and 1
+     *            means to choose <code>metaRecordType</code>.
+     * @param index,
+     *            the offset to consider.
+     * @param recordType,
+     *            the main record type.
+     * @param metaRecordType
+     *            the auxiliary meta record type.
+     * @return the chosen record type.
+     */
+    public static ARecordType chooseSource(List<Integer> keySourceIndicators, int index, ARecordType recordType,
+            ARecordType metaRecordType) {
+        return keySourceIndicators.get(index) == 0 ? recordType : metaRecordType;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/KeyFieldTypeUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/KeyFieldTypeUtils.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/KeyFieldTypeUtils.java
deleted file mode 100644
index 108cd33..0000000
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/KeyFieldTypeUtils.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.utils;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.entities.InternalDatasetDetails;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.NonTaggedFormatUtil;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-
-public class KeyFieldTypeUtils {
-
-    /**
-     * Get the types of primary key (partitioning key) fields
-     *
-     * @param dataset,
-     *            the dataset to consider.
-     * @param recordType,
-     *            the main record type.
-     * @param metaRecordType
-     *            the auxiliary meta record type.
-     * @return a list of IATypes, one for each corresponding primary key field.
-     * @throws AsterixException
-     */
-    public static List<IAType> getPartitoningKeyTypes(Dataset dataset, ARecordType recordType,
-            ARecordType metaRecordType) throws AsterixException {
-        if (dataset.getDatasetType() != DatasetType.INTERNAL) {
-            return null;
-        }
-        InternalDatasetDetails datasetDetails = (InternalDatasetDetails) dataset.getDatasetDetails();
-        return getPartitioningKeyTypes(datasetDetails, recordType, metaRecordType);
-    }
-
-    /**
-     * Get the types of primary key (partitioning key) fields
-     *
-     * @param datasetDetails,
-     *            contains specific data structures for an internal dataset.
-     * @param recordType,
-     *            the main record type.
-     * @param metaRecordType
-     *            the auxiliary meta record type.
-     * @return a list of IATypes, one for each corresponding primary key field.
-     * @throws AsterixException
-     */
-    public static List<IAType> getPartitioningKeyTypes(InternalDatasetDetails datasetDetails, ARecordType recordType,
-            ARecordType metaRecordType) throws AsterixException {
-        List<Integer> keySourceIndicators = datasetDetails.getKeySourceIndicator();
-        List<List<String>> partitioningKeys = datasetDetails.getPartitioningKey();
-        return getKeyTypes(recordType, metaRecordType, partitioningKeys, keySourceIndicators);
-    }
-
-    /**
-     * Get the types of key fields for an index, either primary or secondary.
-     *
-     * @param recordType,
-     *            the main record type.
-     * @param metaRecordType,
-     *            the auxiliary meta record type.
-     * @param keys,
-     *            the list of key fields.
-     * @param keySourceIndicators,
-     *            a list of integers to indicate that each key field is from the main record or the auxiliary meta record.
-     * @return a list of IATypes, one for each corresponding index key field.
-     * @throws AsterixException
-     */
-    public static List<IAType> getKeyTypes(ARecordType recordType, ARecordType metaRecordType, List<List<String>> keys,
-            List<Integer> keySourceIndicators) throws AsterixException {
-        List<IAType> keyTypes = new ArrayList<>();
-        int index = 0;
-        for (List<String> partitioningKey : keys) {
-            keyTypes.add(chooseSource(keySourceIndicators, index, recordType, metaRecordType)
-                    .getSubFieldType(partitioningKey));
-            ++index;
-        }
-        return keyTypes;
-    }
-
-    /**
-     * Get the types of BTree index key fields
-     *
-     * @param index,
-     *            the index to consider.
-     * @param recordType,
-     *            the main record type.
-     * @param metaRecordType
-     *            the auxiliary meta record type.
-     * @return a list of IATypes, one for each corresponding index key field.
-     * @throws AsterixException
-     */
-    public static List<IAType> getBTreeIndexKeyTypes(Index index, ARecordType recordType, ARecordType metaRecordType)
-            throws AsterixException {
-        List<Integer> keySourceIndicators = index.getKeyFieldSourceIndicators();
-        List<IAType> indexKeyTypes = new ArrayList<>();
-        for (int i = 0; i < index.getKeyFieldNames().size(); i++) {
-            Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(i),
-                    index.getKeyFieldNames().get(i), chooseSource(keySourceIndicators, i, recordType, metaRecordType));
-            indexKeyTypes.add(keyPairType.first);
-        }
-        return indexKeyTypes;
-    }
-
-    /**
-     * Get the types of RTree index key fields
-     *
-     * @param index,
-     *            the index to consider.
-     * @param recordType,
-     *            the main record type.
-     * @param metaRecordType
-     *            the auxiliary meta record type.
-     * @return a list of IATypes, one for each corresponding index key field.
-     * @throws AsterixException
-     */
-    public static List<IAType> getRTreeIndexKeyTypes(Index index, ARecordType recordType, ARecordType metaRecordType)
-            throws AsterixException {
-        List<Integer> keySourceIndicators = index.getKeyFieldSourceIndicators();
-        List<IAType> indexKeyTypes = new ArrayList<>();
-        ARecordType targetRecType = chooseSource(keySourceIndicators, 0, recordType, metaRecordType);
-        Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(0),
-                index.getKeyFieldNames().get(0), targetRecType);
-        IAType keyType = keyPairType.first;
-        IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(keyType.getTypeTag());
-        int numKeys = KeyFieldTypeUtils.getNumSecondaryKeys(index, targetRecType, metaRecordType);
-        for (int i = 0; i < numKeys; i++) {
-            indexKeyTypes.add(nestedKeyType);
-        }
-        return indexKeyTypes;
-    }
-
-    /**
-     * Get the number of secondary index keys.
-     *
-     * @param index,
-     *            the index to consider.
-     * @param recordType,
-     *            the main record type.
-     * @param metaRecordType
-     *            the auxiliary meta record type.
-     * @return the number of secondary index keys.
-     * @throws AsterixException
-     */
-    public static int getNumSecondaryKeys(Index index, ARecordType recordType, ARecordType metaRecordType)
-            throws AsterixException {
-        List<Integer> keySourceIndicators = index.getKeyFieldSourceIndicators();
-        switch (index.getIndexType()) {
-            case BTREE:
-            case SINGLE_PARTITION_WORD_INVIX:
-            case SINGLE_PARTITION_NGRAM_INVIX:
-            case LENGTH_PARTITIONED_WORD_INVIX:
-            case LENGTH_PARTITIONED_NGRAM_INVIX: {
-                return index.getKeyFieldNames().size();
-            }
-            case RTREE: {
-                Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(0),
-                        index.getKeyFieldNames().get(0),
-                        chooseSource(keySourceIndicators, 0, recordType, metaRecordType));
-                IAType keyType = keyPairType.first;
-                int numDimensions = NonTaggedFormatUtil.getNumDimensions(keyType.getTypeTag());
-                return numDimensions * 2;
-            }
-            default: {
-                throw new AsterixException("Unknown index kind: " + index.getIndexType());
-            }
-        }
-    }
-
-    /**
-     * Choose between the main record type and the auxiliary record type according to <code>keySourceIndicators</code>.
-     *
-     * @param keySourceIndicators,
-     *            a list of integers, 0 means to choose <code>recordType</code> and 1
-     *            means to choose <code>metaRecordType</code>.
-     * @param index,
-     *            the offset to consider.
-     * @param recordType,
-     *            the main record type.
-     * @param metaRecordType
-     *            the auxiliary meta record type.
-     * @return the chosen record type.
-     */
-    public static ARecordType chooseSource(List<Integer> keySourceIndicators, int index, ARecordType recordType,
-            ARecordType metaRecordType) {
-        return keySourceIndicators.get(index) == 0 ? recordType : metaRecordType;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java
index f16919d..a9c721a 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java
@@ -27,7 +27,7 @@ import org.apache.asterix.metadata.entities.Dataverse;
 
 public class MetadataLockManager {
 
-    public static MetadataLockManager INSTANCE = new MetadataLockManager();
+    public static final MetadataLockManager INSTANCE = new MetadataLockManager();
     private final ConcurrentHashMap<String, ReentrantReadWriteLock> dataversesLocks;
     private final ConcurrentHashMap<String, DatasetLock> datasetsLocks;
     private final ConcurrentHashMap<String, ReentrantReadWriteLock> functionsLocks;
@@ -38,14 +38,14 @@ public class MetadataLockManager {
     private final ConcurrentHashMap<String, ReentrantReadWriteLock> dataTypeLocks;
 
     private MetadataLockManager() {
-        dataversesLocks = new ConcurrentHashMap<String, ReentrantReadWriteLock>();
-        datasetsLocks = new ConcurrentHashMap<String, DatasetLock>();
-        functionsLocks = new ConcurrentHashMap<String, ReentrantReadWriteLock>();
-        nodeGroupsLocks = new ConcurrentHashMap<String, ReentrantReadWriteLock>();
-        feedsLocks = new ConcurrentHashMap<String, ReentrantReadWriteLock>();
-        feedPolicyLocks = new ConcurrentHashMap<String, ReentrantReadWriteLock>();
-        compactionPolicyLocks = new ConcurrentHashMap<String, ReentrantReadWriteLock>();
-        dataTypeLocks = new ConcurrentHashMap<String, ReentrantReadWriteLock>();
+        dataversesLocks = new ConcurrentHashMap<>();
+        datasetsLocks = new ConcurrentHashMap<>();
+        functionsLocks = new ConcurrentHashMap<>();
+        nodeGroupsLocks = new ConcurrentHashMap<>();
+        feedsLocks = new ConcurrentHashMap<>();
+        feedPolicyLocks = new ConcurrentHashMap<>();
+        compactionPolicyLocks = new ConcurrentHashMap<>();
+        dataTypeLocks = new ConcurrentHashMap<>();
     }
 
     public void acquireDataverseReadLock(String dataverseName) {
@@ -408,8 +408,8 @@ public class MetadataLockManager {
         releaseDataverseReadLock(dataverseName);
     }
 
-    public void insertDeleteUpsertBegin(String dataverseName, String datasetFullyQualifiedName, List<String> dataverses,
-            List<String> datasets) {
+    public void insertDeleteUpsertBegin(String dataverseName, String datasetFullyQualifiedName,
+            List<String> dataverses, List<String> datasets) {
         dataverses.add(dataverseName);
         datasets.add(datasetFullyQualifiedName);
         Collections.sort(dataverses);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataUtil.java
new file mode 100644
index 0000000..3133aba
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataUtil.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils;
+
+public class MetadataUtil {
+    public static final int PENDING_NO_OP = 0;
+    public static final int PENDING_ADD_OP = 1;
+    public static final int PENDING_DROP_OP = 2;
+
+    private MetadataUtil() {
+    }
+
+    public static String pendingOpToString(int pendingOp) {
+        switch (pendingOp) {
+            case PENDING_NO_OP:
+                return "Pending No Operation";
+            case PENDING_ADD_OP:
+                return "Pending Add Operation";
+            case PENDING_DROP_OP:
+                return "Pending Drop Operation";
+            default:
+                return "Unknown Pending Operation";
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeDataflowHelperFactoryProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeDataflowHelperFactoryProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeDataflowHelperFactoryProvider.java
new file mode 100644
index 0000000..8859b9d
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeDataflowHelperFactoryProvider.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
+import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.indexing.IndexingConstants;
+import org.apache.asterix.metadata.api.IIndexDataflowHelperFactoryProvider;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.utils.NonTaggedFormatUtil;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import org.apache.hyracks.storage.am.lsm.rtree.dataflow.ExternalRTreeDataflowHelperFactory;
+import org.apache.hyracks.storage.am.lsm.rtree.dataflow.LSMRTreeWithAntiMatterTuplesDataflowHelperFactory;
+import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType;
+
+public class RTreeDataflowHelperFactoryProvider implements IIndexDataflowHelperFactoryProvider {
+
+    public static final RTreeDataflowHelperFactoryProvider INSTANCE = new RTreeDataflowHelperFactoryProvider();
+
+    private RTreeDataflowHelperFactoryProvider() {
+    }
+
+    protected RTreePolicyType rTreePolicyType() {
+        return RTreePolicyType.RTREE;
+    }
+
+    @Override
+    public IIndexDataflowHelperFactory getIndexDataflowHelperFactory(MetadataProvider mdProvider, Dataset dataset,
+            Index index, ARecordType recordType, ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory,
+            Map<String, String> mergePolicyProperties, ITypeTraits[] filterTypeTraits,
+            IBinaryComparatorFactory[] filterCmpFactories) throws AlgebricksException {
+        if (index.getKeyFieldNames().size() != 1) {
+            throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_INDEX_NUM_OF_FIELD,
+                    index.getKeyFieldNames().size(), index.getIndexType(), 1);
+        }
+        IAType spatialType = Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(0),
+                index.getKeyFieldNames().get(0), recordType).first;
+        if (spatialType == null) {
+            throw new CompilationException(ErrorCode.COMPILATION_FIELD_NOT_FOUND,
+                    StringUtils.join(index.getKeyFieldNames().get(0), '.'));
+        }
+        List<List<String>> primaryKeyFields = DatasetUtil.getPartitioningKeys(dataset);
+        int numPrimaryKeys = primaryKeyFields.size();
+        ITypeTraits[] primaryTypeTraits = null;
+        IBinaryComparatorFactory[] primaryComparatorFactories = null;
+        IStorageComponentProvider storageComponentProvider = mdProvider.getStorageComponentProvider();
+        if (dataset.getDatasetType() == DatasetType.INTERNAL) {
+            primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1 + (dataset.hasMetaPart() ? 1 : 0)];
+            primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
+            List<Integer> indicators = null;
+            if (dataset.hasMetaPart()) {
+                indicators = ((InternalDatasetDetails) dataset.getDatasetDetails()).getKeySourceIndicator();
+            }
+            for (int i = 0; i < numPrimaryKeys; i++) {
+                IAType keyType = (indicators == null || indicators.get(i) == 0)
+                        ? recordType.getSubFieldType(primaryKeyFields.get(i))
+                        : metaType.getSubFieldType(primaryKeyFields.get(i));
+                primaryComparatorFactories[i] = storageComponentProvider.getComparatorFactoryProvider()
+                        .getBinaryComparatorFactory(keyType, true);
+                primaryTypeTraits[i] = storageComponentProvider.getTypeTraitProvider().getTypeTrait(keyType);
+            }
+            primaryTypeTraits[numPrimaryKeys] =
+                    storageComponentProvider.getTypeTraitProvider().getTypeTrait(recordType);
+            if (dataset.hasMetaPart()) {
+                primaryTypeTraits[numPrimaryKeys + 1] =
+                        storageComponentProvider.getTypeTraitProvider().getTypeTrait(recordType);
+            }
+        }
+        boolean isPointMBR =
+                spatialType.getTypeTag() == ATypeTag.POINT || spatialType.getTypeTag() == ATypeTag.POINT3D;
+        int numDimensions = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
+        int numNestedSecondaryKeyFields = numDimensions * 2;
+        IBinaryComparatorFactory[] secondaryComparatorFactories =
+                new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
+        IPrimitiveValueProviderFactory[] valueProviderFactories =
+                new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
+        ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numNestedSecondaryKeyFields + numPrimaryKeys];
+        IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
+        ATypeTag keyType = nestedKeyType.getTypeTag();
+        for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
+            secondaryComparatorFactories[i] = storageComponentProvider.getComparatorFactoryProvider()
+                    .getBinaryComparatorFactory(nestedKeyType, true);
+            secondaryTypeTraits[i] = storageComponentProvider.getTypeTraitProvider().getTypeTrait(nestedKeyType);
+            valueProviderFactories[i] = storageComponentProvider.getPrimitiveValueProviderFactory();
+
+        }
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            secondaryTypeTraits[numNestedSecondaryKeyFields + i] = (dataset.getDatasetType() == DatasetType.INTERNAL)
+                    ? primaryTypeTraits[i] : IndexingConstants.getTypeTraits(i);
+        }
+        int[] rtreeFields = null;
+        if (filterTypeTraits != null && filterTypeTraits.length > 0) {
+            rtreeFields = new int[numNestedSecondaryKeyFields + numPrimaryKeys];
+            for (int i = 0; i < rtreeFields.length; i++) {
+                rtreeFields[i] = i;
+            }
+        }
+        if (dataset.getDatasetType() == DatasetType.INTERNAL) {
+            int[] secondaryFilterFields = (filterTypeTraits != null && filterTypeTraits.length > 0)
+                    ? new int[] { numNestedSecondaryKeyFields + numPrimaryKeys } : null;
+            IBinaryComparatorFactory[] btreeCompFactories = getComparatorFactoriesForDeletedKeyBTree(
+                    secondaryTypeTraits, primaryComparatorFactories, secondaryComparatorFactories);
+            return new LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(valueProviderFactories, rTreePolicyType(),
+                    btreeCompFactories, new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+                    mergePolicyFactory, mergePolicyProperties, dataset.getIndexOperationTrackerFactory(index),
+                    storageComponentProvider.getIoOperationSchedulerProvider(),
+                    dataset.getIoOperationCallbackFactory(index),
+                    MetadataProvider.proposeLinearizer(keyType, secondaryComparatorFactories.length), rtreeFields,
+                    filterTypeTraits, filterCmpFactories, secondaryFilterFields, !dataset.getDatasetDetails().isTemp(),
+                    isPointMBR);
+        } else {
+            return new ExternalRTreeDataflowHelperFactory(valueProviderFactories, rTreePolicyType(),
+                    ExternalIndexingOperations.getBuddyBtreeComparatorFactories(), mergePolicyFactory,
+                    mergePolicyProperties, dataset.getIndexOperationTrackerFactory(index),
+                    storageComponentProvider.getIoOperationSchedulerProvider(),
+                    dataset.getIoOperationCallbackFactory(index),
+                    MetadataProvider.proposeLinearizer(keyType, secondaryComparatorFactories.length),
+                    mdProvider.getStorageProperties().getBloomFilterFalsePositiveRate(),
+                    new int[] { numNestedSecondaryKeyFields },
+                    ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(dataset), true, isPointMBR);
+        }
+    }
+
+    private static IBinaryComparatorFactory[] getComparatorFactoriesForDeletedKeyBTree(
+            ITypeTraits[] secondaryTypeTraits, IBinaryComparatorFactory[] primaryComparatorFactories,
+            IBinaryComparatorFactory[] secondaryComparatorFactories) {
+        IBinaryComparatorFactory[] btreeCompFactories = new IBinaryComparatorFactory[secondaryTypeTraits.length];
+        int i = 0;
+        for (; i < secondaryComparatorFactories.length; i++) {
+            btreeCompFactories[i] = secondaryComparatorFactories[i];
+        }
+        for (int j = 0; i < secondaryTypeTraits.length; i++, j++) {
+            btreeCompFactories[i] = primaryComparatorFactories[j];
+        }
+        return btreeCompFactories;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
new file mode 100644
index 0000000..63368c7
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils;
+
+import java.util.List;
+
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.config.IPropertiesProvider;
+import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.external.indexing.IndexingConstants;
+import org.apache.asterix.external.operators.ExternalScanOperatorDescriptor;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.utils.RuntimeUtils;
+import org.apache.asterix.transaction.management.resource.ExternalBTreeWithBuddyLocalResourceMetadataFactory;
+import org.apache.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadataFactory;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy;
+import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
+import org.apache.hyracks.algebricks.data.ITypeTraitProvider;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor;
+import org.apache.hyracks.storage.common.file.ILocalResourceFactoryProvider;
+import org.apache.hyracks.storage.common.file.LocalResource;
+
+public class SecondaryBTreeOperationsHelper extends SecondaryIndexOperationsHelper {
+
+    protected SecondaryBTreeOperationsHelper(Dataset dataset, Index index, PhysicalOptimizationConfig physOptConf,
+            IPropertiesProvider propertiesProvider, MetadataProvider metadataProvider, ARecordType recType,
+            ARecordType metaType, ARecordType enforcedType, ARecordType enforcedMetaType) {
+        super(dataset, index, physOptConf, propertiesProvider, metadataProvider, recType, metaType, enforcedType,
+                enforcedMetaType);
+    }
+
+    @Override
+    public JobSpecification buildCreationJobSpec() throws AlgebricksException {
+        JobSpecification spec = RuntimeUtils.createJobSpecification();
+        ILocalResourceFactoryProvider localResourceFactoryProvider;
+        IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory(
+                metadataProvider, index, itemType, metaType, mergePolicyFactory, mergePolicyFactoryProperties);
+        IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider();
+        if (dataset.getDatasetType() == DatasetType.INTERNAL) {
+            //prepare a LocalResourceMetadata which will be stored in NC's local resource repository
+            LSMBTreeLocalResourceMetadataFactory localResourceMetadata = new LSMBTreeLocalResourceMetadataFactory(
+                    secondaryTypeTraits, secondaryComparatorFactories, secondaryBloomFilterKeyFields, false,
+                    dataset.getDatasetId(), mergePolicyFactory, mergePolicyFactoryProperties, filterTypeTraits,
+                    filterCmpFactories, secondaryBTreeFields, secondaryFilterFields,
+                    dataset.getIndexOperationTrackerFactory(index), dataset.getIoOperationCallbackFactory(index),
+                    storageComponentProvider.getMetadataPageManagerFactory());
+            localResourceFactoryProvider =
+                    new PersistentLocalResourceFactoryProvider(localResourceMetadata, LocalResource.LSMBTreeResource);
+        } else {
+            // External dataset local resource and dataflow helper
+            int[] buddyBreeFields = new int[] { index.getKeyFieldNames().size() };
+            ExternalBTreeWithBuddyLocalResourceMetadataFactory localResourceMetadata =
+                    new ExternalBTreeWithBuddyLocalResourceMetadataFactory(dataset.getDatasetId(),
+                            secondaryComparatorFactories, secondaryTypeTraits, mergePolicyFactory,
+                            mergePolicyFactoryProperties, buddyBreeFields,
+                            dataset.getIndexOperationTrackerFactory(index),
+                            dataset.getIoOperationCallbackFactory(index),
+                            storageComponentProvider.getMetadataPageManagerFactory());
+            localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(localResourceMetadata,
+                    LocalResource.ExternalBTreeWithBuddyResource);
+        }
+        TreeIndexCreateOperatorDescriptor secondaryIndexCreateOp =
+                new TreeIndexCreateOperatorDescriptor(spec, storageComponentProvider.getStorageManager(),
+                        storageComponentProvider.getIndexLifecycleManagerProvider(), secondaryFileSplitProvider,
+                        secondaryTypeTraits, secondaryComparatorFactories, secondaryBloomFilterKeyFields,
+                        indexDataflowHelperFactory, localResourceFactoryProvider,
+                        dataset.getModificationCallbackFactory(storageComponentProvider, index, null,
+                                IndexOperation.CREATE, null),
+                        storageComponentProvider.getMetadataPageManagerFactory());
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryIndexCreateOp,
+                secondaryPartitionConstraint);
+        spec.addRoot(secondaryIndexCreateOp);
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    @Override
+    public JobSpecification buildLoadingJobSpec() throws AlgebricksException {
+        JobSpecification spec = RuntimeUtils.createJobSpecification();
+        boolean isEnforcingKeyTypes = index.isEnforcingKeyFileds();
+        int[] fieldPermutation = createFieldPermutationForBulkLoadOp(index.getKeyFieldNames().size());
+        IIndexDataflowHelperFactory dataflowHelperFactory = dataset.getIndexDataflowHelperFactory(metadataProvider,
+                index, itemType, metaType, mergePolicyFactory, mergePolicyFactoryProperties);
+        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+            /*
+             * In case of external data,
+             * this method is used to build loading jobs for both initial load on index creation
+             * and transaction load on dataset referesh
+             */
+
+            // Create external indexing scan operator
+            ExternalScanOperatorDescriptor primaryScanOp = createExternalIndexingOp(spec);
+
+            // Assign op.
+            AbstractOperatorDescriptor sourceOp = primaryScanOp;
+            if (isEnforcingKeyTypes && !enforcedItemType.equals(itemType)) {
+                sourceOp = createCastOp(spec, dataset.getDatasetType());
+                spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, sourceOp, 0);
+            }
+            AlgebricksMetaOperatorDescriptor asterixAssignOp =
+                    createExternalAssignOp(spec, index.getKeyFieldNames().size(), secondaryRecDesc);
+
+            // If any of the secondary fields are nullable, then add a select op that filters nulls.
+            AlgebricksMetaOperatorDescriptor selectOp = null;
+            if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) {
+                selectOp = createFilterNullsSelectOp(spec, index.getKeyFieldNames().size(), secondaryRecDesc);
+            }
+
+            // Sort by secondary keys.
+            ExternalSortOperatorDescriptor sortOp = createSortOp(spec, secondaryComparatorFactories, secondaryRecDesc);
+            // Create secondary BTree bulk load op.
+            AbstractTreeIndexOperatorDescriptor secondaryBulkLoadOp;
+            IOperatorDescriptor root;
+            if (externalFiles != null) {
+                // Transaction load
+                secondaryBulkLoadOp = createExternalIndexBulkModifyOp(spec, fieldPermutation, dataflowHelperFactory,
+                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR);
+                root = secondaryBulkLoadOp;
+            } else {
+                // Initial load
+                secondaryBulkLoadOp = createTreeIndexBulkLoadOp(spec, fieldPermutation, dataflowHelperFactory,
+                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR);
+                AlgebricksMetaOperatorDescriptor metaOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0,
+                        new IPushRuntimeFactory[] { new SinkRuntimeFactory() },
+                        new RecordDescriptor[] { secondaryRecDesc });
+                spec.connect(new OneToOneConnectorDescriptor(spec), secondaryBulkLoadOp, 0, metaOp, 0);
+                root = metaOp;
+            }
+            spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, asterixAssignOp, 0);
+            if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) {
+                spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0);
+                spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 0, sortOp, 0);
+            } else {
+                spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, sortOp, 0);
+            }
+            spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0);
+            spec.addRoot(root);
+            spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+            return spec;
+        } else {
+            // Create dummy key provider for feeding the primary index scan.
+            AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec);
+
+            // Create primary index scan op.
+            BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(spec);
+
+            // Assign op.
+            AbstractOperatorDescriptor sourceOp = primaryScanOp;
+            if (isEnforcingKeyTypes && !enforcedItemType.equals(itemType)) {
+                sourceOp = createCastOp(spec, dataset.getDatasetType());
+                spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, sourceOp, 0);
+            }
+            AlgebricksMetaOperatorDescriptor asterixAssignOp =
+                    createAssignOp(spec, index.getKeyFieldNames().size(), secondaryRecDesc);
+
+            // If any of the secondary fields are nullable, then add a select op that filters nulls.
+            AlgebricksMetaOperatorDescriptor selectOp = null;
+            if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) {
+                selectOp = createFilterNullsSelectOp(spec, index.getKeyFieldNames().size(), secondaryRecDesc);
+            }
+
+            // Sort by secondary keys.
+            ExternalSortOperatorDescriptor sortOp = createSortOp(spec, secondaryComparatorFactories, secondaryRecDesc);
+            // Create secondary BTree bulk load op.
+            TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(spec, fieldPermutation,
+                    dataflowHelperFactory, GlobalConfig.DEFAULT_TREE_FILL_FACTOR);
+
+            AlgebricksMetaOperatorDescriptor metaOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0,
+                    new IPushRuntimeFactory[] { new SinkRuntimeFactory() },
+                    new RecordDescriptor[] { secondaryRecDesc });
+            // Connect the operators.
+            spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
+            spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, asterixAssignOp, 0);
+            if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) {
+                spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0);
+                spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 0, sortOp, 0);
+            } else {
+                spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, sortOp, 0);
+            }
+            spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0);
+            spec.connect(new OneToOneConnectorDescriptor(spec), secondaryBulkLoadOp, 0, metaOp, 0);
+            spec.addRoot(metaOp);
+            spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+            return spec;
+        }
+    }
+
+    @Override
+    protected int getNumSecondaryKeys() {
+        return index.getKeyFieldNames().size();
+    }
+
+    @Override
+    public JobSpecification buildCompactJobSpec() throws AlgebricksException {
+        JobSpecification spec = RuntimeUtils.createJobSpecification();
+        LSMTreeIndexCompactOperatorDescriptor compactOp;
+        IIndexDataflowHelperFactory dataflowHelperFactory = dataset.getIndexDataflowHelperFactory(metadataProvider,
+                index, itemType, metaType, mergePolicyFactory, mergePolicyFactoryProperties);
+        IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider();
+        compactOp =
+                new LSMTreeIndexCompactOperatorDescriptor(spec, storageComponentProvider.getStorageManager(),
+                        storageComponentProvider.getIndexLifecycleManagerProvider(), secondaryFileSplitProvider,
+                        secondaryTypeTraits, secondaryComparatorFactories, secondaryBloomFilterKeyFields,
+                        dataflowHelperFactory,
+                        dataset.getModificationCallbackFactory(storageComponentProvider, index, null,
+                                IndexOperation.FULL_MERGE, null),
+                        storageComponentProvider.getMetadataPageManagerFactory());
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp,
+                secondaryPartitionConstraint);
+        spec.addRoot(compactOp);
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    @Override
+    @SuppressWarnings("rawtypes")
+    protected void setSecondaryRecDescAndComparators() throws AlgebricksException {
+        int numSecondaryKeys = index.getKeyFieldNames().size();
+        secondaryFieldAccessEvalFactories = new IScalarEvaluatorFactory[numSecondaryKeys + numFilterFields];
+        secondaryComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys + numPrimaryKeys];
+        secondaryBloomFilterKeyFields = new int[numSecondaryKeys];
+        ISerializerDeserializer[] secondaryRecFields =
+                new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys + numFilterFields];
+        ISerializerDeserializer[] enforcedRecFields =
+                new ISerializerDeserializer[1 + numPrimaryKeys + (dataset.hasMetaPart() ? 1 : 0) + numFilterFields];
+        ITypeTraits[] enforcedTypeTraits =
+                new ITypeTraits[1 + numPrimaryKeys + (dataset.hasMetaPart() ? 1 : 0) + numFilterFields];
+        secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys];
+        ISerializerDeserializerProvider serdeProvider = metadataProvider.getFormat().getSerdeProvider();
+        ITypeTraitProvider typeTraitProvider = metadataProvider.getFormat().getTypeTraitProvider();
+        IBinaryComparatorFactoryProvider comparatorFactoryProvider =
+                metadataProvider.getFormat().getBinaryComparatorFactoryProvider();
+        // Record column is 0 for external datasets, numPrimaryKeys for internal ones
+        int recordColumn = dataset.getDatasetType() == DatasetType.INTERNAL ? numPrimaryKeys : 0;
+        boolean isEnforcingKeyTypes = index.isEnforcingKeyFileds();
+        for (int i = 0; i < numSecondaryKeys; i++) {
+            ARecordType sourceType;
+            int sourceColumn;
+            List<Integer> keySourceIndicators = index.getKeyFieldSourceIndicators();
+            if (keySourceIndicators == null || keySourceIndicators.get(i) == 0) {
+                sourceType = itemType;
+                sourceColumn = recordColumn;
+            } else {
+                sourceType = metaType;
+                sourceColumn = recordColumn + 1;
+            }
+            secondaryFieldAccessEvalFactories[i] = metadataProvider.getFormat().getFieldAccessEvaluatorFactory(
+                    isEnforcingKeyTypes ? enforcedItemType : sourceType, index.getKeyFieldNames().get(i),
+                    sourceColumn);
+            Pair<IAType, Boolean> keyTypePair = Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(i),
+                    index.getKeyFieldNames().get(i), sourceType);
+            IAType keyType = keyTypePair.first;
+            anySecondaryKeyIsNullable = anySecondaryKeyIsNullable || keyTypePair.second;
+            ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
+            secondaryRecFields[i] = keySerde;
+            secondaryComparatorFactories[i] = comparatorFactoryProvider.getBinaryComparatorFactory(keyType, true);
+            secondaryTypeTraits[i] = typeTraitProvider.getTypeTrait(keyType);
+            secondaryBloomFilterKeyFields[i] = i;
+        }
+        if (dataset.getDatasetType() == DatasetType.INTERNAL) {
+            // Add serializers and comparators for primary index fields.
+            for (int i = 0; i < numPrimaryKeys; i++) {
+                secondaryRecFields[numSecondaryKeys + i] = primaryRecDesc.getFields()[i];
+                enforcedRecFields[i] = primaryRecDesc.getFields()[i];
+                secondaryTypeTraits[numSecondaryKeys + i] = primaryRecDesc.getTypeTraits()[i];
+                enforcedTypeTraits[i] = primaryRecDesc.getTypeTraits()[i];
+                secondaryComparatorFactories[numSecondaryKeys + i] = primaryComparatorFactories[i];
+            }
+        } else {
+            // Add serializers and comparators for RID fields.
+            for (int i = 0; i < numPrimaryKeys; i++) {
+                secondaryRecFields[numSecondaryKeys + i] = IndexingConstants.getSerializerDeserializer(i);
+                enforcedRecFields[i] = IndexingConstants.getSerializerDeserializer(i);
+                secondaryTypeTraits[numSecondaryKeys + i] = IndexingConstants.getTypeTraits(i);
+                enforcedTypeTraits[i] = IndexingConstants.getTypeTraits(i);
+                secondaryComparatorFactories[numSecondaryKeys + i] = IndexingConstants.getComparatorFactory(i);
+            }
+        }
+        enforcedRecFields[numPrimaryKeys] = serdeProvider.getSerializerDeserializer(itemType);
+        enforcedTypeTraits[numPrimaryKeys] = typeTraitProvider.getTypeTrait(itemType);
+        if (dataset.hasMetaPart()) {
+            enforcedRecFields[numPrimaryKeys + 1] = serdeProvider.getSerializerDeserializer(metaType);
+            enforcedTypeTraits[numPrimaryKeys + 1] = typeTraitProvider.getTypeTrait(metaType);
+        }
+
+        if (numFilterFields > 0) {
+            secondaryFieldAccessEvalFactories[numSecondaryKeys] = metadataProvider.getFormat()
+                    .getFieldAccessEvaluatorFactory(itemType, filterFieldName, numPrimaryKeys);
+            Pair<IAType, Boolean> keyTypePair = Index.getNonNullableKeyFieldType(filterFieldName, itemType);
+            IAType type = keyTypePair.first;
+            ISerializerDeserializer serde = serdeProvider.getSerializerDeserializer(type);
+            secondaryRecFields[numPrimaryKeys + numSecondaryKeys] = serde;
+            enforcedRecFields[numPrimaryKeys + 1 + (dataset.hasMetaPart() ? 1 : 0)] = serde;
+            enforcedTypeTraits[numPrimaryKeys + 1 + (dataset.hasMetaPart() ? 1 : 0)] =
+                    typeTraitProvider.getTypeTrait(type);
+        }
+        secondaryRecDesc = new RecordDescriptor(secondaryRecFields, secondaryTypeTraits);
+        enforcedRecDesc = new RecordDescriptor(enforcedRecFields, enforcedTypeTraits);
+
+    }
+
+    protected int[] createFieldPermutationForBulkLoadOp(int numSecondaryKeyFields) {
+        int[] fieldPermutation = new int[numSecondaryKeyFields + numPrimaryKeys + numFilterFields];
+        for (int i = 0; i < fieldPermutation.length; i++) {
+            fieldPermutation[i] = i;
+        }
+        return fieldPermutation;
+    }
+}


Mime
View raw message