asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From buyin...@apache.org
Subject [11/28] asterixdb git commit: Introduce IStorageComponentProvider
Date Thu, 02 Feb 2017 18:24:20 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
new file mode 100644
index 0000000..f7e569c
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
@@ -0,0 +1,543 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.metadata.utils;
+
+import java.io.DataOutput;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
+import org.apache.asterix.common.config.IPropertiesProvider;
+import org.apache.asterix.common.context.ITransactionSubsystemProvider;
+import org.apache.asterix.common.context.TransactionSubsystemProvider;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
+import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.indexing.IndexingConstants;
+import org.apache.asterix.external.operators.ExternalIndexBulkModifyOperatorDescriptor;
+import org.apache.asterix.external.operators.ExternalScanOperatorDescriptor;
+import org.apache.asterix.formats.nontagged.BinaryBooleanInspector;
+import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.formats.nontagged.TypeTraitProvider;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.evaluators.functions.AndDescriptor;
+import org.apache.asterix.runtime.evaluators.functions.CastTypeDescriptor;
+import org.apache.asterix.runtime.evaluators.functions.IsUnknownDescriptor;
+import org.apache.asterix.runtime.evaluators.functions.NotDescriptor;
+import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
+import org.apache.asterix.runtime.utils.AppContextInfo;
+import org.apache.asterix.runtime.utils.RuntimeComponentsProvider;
+import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
+import org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.evaluators.ColumnAccessEvalFactory;
+import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.std.StreamSelectRuntimeFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+import org.apache.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+
+@SuppressWarnings("rawtypes")
+// TODO: We should eventually have a hierarchy of classes that can create all
+// possible index job specs,
+// not just for creation.
+public abstract class SecondaryIndexOperationsHelper {
+    protected final PhysicalOptimizationConfig physOptConf;
+    protected final MetadataProvider metadataProvider;
+    protected final Dataset dataset;
+    protected final Index index;
+    protected final ARecordType itemType;
+    protected final ARecordType metaType;
+    protected final ARecordType enforcedItemType;
+    protected final ARecordType enforcedMetaType;
+    protected ISerializerDeserializer metaSerde;
+    protected ISerializerDeserializer payloadSerde;
+    protected IFileSplitProvider primaryFileSplitProvider;
+    protected AlgebricksPartitionConstraint primaryPartitionConstraint;
+    protected IFileSplitProvider secondaryFileSplitProvider;
+    protected AlgebricksPartitionConstraint secondaryPartitionConstraint;
+    protected boolean anySecondaryKeyIsNullable = false;
+    protected long numElementsHint;
+    protected IBinaryComparatorFactory[] primaryComparatorFactories;
+    protected int[] primaryBloomFilterKeyFields;
+    protected RecordDescriptor primaryRecDesc;
+    protected IBinaryComparatorFactory[] secondaryComparatorFactories;
+    protected ITypeTraits[] secondaryTypeTraits;
+    protected int[] secondaryBloomFilterKeyFields;
+    protected RecordDescriptor secondaryRecDesc;
+    protected IScalarEvaluatorFactory[] secondaryFieldAccessEvalFactories;
+    protected IPropertiesProvider propertiesProvider;
+    protected ILSMMergePolicyFactory mergePolicyFactory;
+    protected Map<String, String> mergePolicyFactoryProperties;
+    protected RecordDescriptor enforcedRecDesc;
+    protected int numFilterFields;
+    protected List<String> filterFieldName;
+    protected ITypeTraits[] filterTypeTraits;
+    protected IBinaryComparatorFactory[] filterCmpFactories;
+    protected int[] secondaryFilterFields;
+    protected int[] primaryFilterFields;
+    protected int[] primaryBTreeFields;
+    protected int[] secondaryBTreeFields;
+    protected List<ExternalFile> externalFiles;
+    protected int numPrimaryKeys;
+
+    // Prevent public construction. Should be created via createIndexCreator().
+    protected SecondaryIndexOperationsHelper(Dataset dataset, Index index, PhysicalOptimizationConfig physOptConf,
+            IPropertiesProvider propertiesProvider, MetadataProvider metadataProvider, ARecordType recType,
+            ARecordType metaType, ARecordType enforcedType, ARecordType enforcedMetaType) {
+        this.dataset = dataset;
+        this.index = index;
+        this.physOptConf = physOptConf;
+        this.propertiesProvider = propertiesProvider;
+        this.metadataProvider = metadataProvider;
+        this.itemType = recType;
+        this.metaType = metaType;
+        this.enforcedItemType = enforcedType;
+        this.enforcedMetaType = enforcedMetaType;
+    }
+
+    public static SecondaryIndexOperationsHelper createIndexOperationsHelper(Dataset dataset, Index index,
+            MetadataProvider metadataProvider, PhysicalOptimizationConfig physOptConf, ARecordType recType,
+            ARecordType metaType, ARecordType enforcedType, ARecordType enforcedMetaType) throws AlgebricksException {
+        IPropertiesProvider asterixPropertiesProvider = AppContextInfo.INSTANCE;
+        SecondaryIndexOperationsHelper indexOperationsHelper;
+        switch (index.getIndexType()) {
+            case BTREE:
+                indexOperationsHelper =
+                        new SecondaryBTreeOperationsHelper(dataset, index, physOptConf, asterixPropertiesProvider,
+                                metadataProvider, recType, metaType, enforcedType, enforcedMetaType);
+                break;
+            case RTREE:
+                indexOperationsHelper =
+                        new SecondaryRTreeOperationsHelper(dataset, index, physOptConf, asterixPropertiesProvider,
+                                metadataProvider, recType, metaType, enforcedType, enforcedMetaType);
+                break;
+            case SINGLE_PARTITION_WORD_INVIX:
+            case SINGLE_PARTITION_NGRAM_INVIX:
+            case LENGTH_PARTITIONED_WORD_INVIX:
+            case LENGTH_PARTITIONED_NGRAM_INVIX:
+                indexOperationsHelper = new SecondaryInvertedIndexOperationsHelper(dataset, index, physOptConf,
+                        asterixPropertiesProvider, metadataProvider, recType, metaType, enforcedType,
+                        enforcedMetaType);
+                break;
+            default:
+                throw new CompilationException(ErrorCode.COMPILATION_UNKNOWN_INDEX_TYPE, index.getIndexType());
+        }
+        indexOperationsHelper.init();
+        return indexOperationsHelper;
+    }
+
+    public abstract JobSpecification buildCreationJobSpec() throws AlgebricksException;
+
+    public abstract JobSpecification buildLoadingJobSpec() throws AlgebricksException;
+
+    public abstract JobSpecification buildCompactJobSpec() throws AlgebricksException;
+
+    protected void init() throws AlgebricksException {
+        payloadSerde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
+        metaSerde =
+                metaType == null ? null : SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(metaType);
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint =
+                metadataProvider.getSplitProviderAndConstraints(index.getDataverseName(), index.getDatasetName(),
+                        index.getIndexName(), dataset.getDatasetDetails().isTemp());
+        secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
+        secondaryPartitionConstraint = secondarySplitsAndConstraint.second;
+        numPrimaryKeys = DatasetUtil.getPartitioningKeys(dataset).size();
+        if (dataset.getDatasetType() == DatasetType.INTERNAL) {
+            filterFieldName = DatasetUtil.getFilterField(dataset);
+            if (filterFieldName != null) {
+                numFilterFields = 1;
+            } else {
+                numFilterFields = 0;
+            }
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint =
+                    metadataProvider.getSplitProviderAndConstraints(dataset.getDataverseName(),
+                            dataset.getDatasetName(), dataset.getDatasetName(), dataset.getDatasetDetails().isTemp());
+            primaryFileSplitProvider = primarySplitsAndConstraint.first;
+            primaryPartitionConstraint = primarySplitsAndConstraint.second;
+            setPrimaryRecDescAndComparators();
+        }
+        setSecondaryRecDescAndComparators();
+        numElementsHint = metadataProvider.getCardinalityPerPartitionHint(dataset);
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+                DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
+        mergePolicyFactory = compactionInfo.first;
+        mergePolicyFactoryProperties = compactionInfo.second;
+        if (numFilterFields > 0) {
+            setFilterTypeTraitsAndComparators();
+        }
+    }
+
+    protected void setFilterTypeTraitsAndComparators() throws AlgebricksException {
+        filterTypeTraits = new ITypeTraits[numFilterFields];
+        filterCmpFactories = new IBinaryComparatorFactory[numFilterFields];
+        secondaryFilterFields = new int[numFilterFields];
+        primaryFilterFields = new int[numFilterFields];
+        primaryBTreeFields = new int[numPrimaryKeys + 1];
+        secondaryBTreeFields = new int[index.getKeyFieldNames().size() + numPrimaryKeys];
+        for (int i = 0; i < primaryBTreeFields.length; i++) {
+            primaryBTreeFields[i] = i;
+        }
+        for (int i = 0; i < secondaryBTreeFields.length; i++) {
+            secondaryBTreeFields[i] = i;
+        }
+
+        IAType type = itemType.getSubFieldType(filterFieldName);
+        filterCmpFactories[0] = BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(type, true);
+        filterTypeTraits[0] = TypeTraitProvider.INSTANCE.getTypeTrait(type);
+        secondaryFilterFields[0] = getNumSecondaryKeys() + numPrimaryKeys;
+        primaryFilterFields[0] = numPrimaryKeys + 1;
+    }
+
+    protected abstract int getNumSecondaryKeys();
+
+    protected void setPrimaryRecDescAndComparators() throws AlgebricksException {
+        List<List<String>> partitioningKeys = DatasetUtil.getPartitioningKeys(dataset);
+        ISerializerDeserializer[] primaryRecFields =
+                new ISerializerDeserializer[numPrimaryKeys + 1 + (dataset.hasMetaPart() ? 1 : 0)];
+        ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1 + (dataset.hasMetaPart() ? 1 : 0)];
+        primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
+        primaryBloomFilterKeyFields = new int[numPrimaryKeys];
+        ISerializerDeserializerProvider serdeProvider = metadataProvider.getFormat().getSerdeProvider();
+        List<Integer> indicators = null;
+        if (dataset.hasMetaPart()) {
+            indicators = ((InternalDatasetDetails) dataset.getDatasetDetails()).getKeySourceIndicator();
+        }
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            IAType keyType =
+                    (indicators == null || indicators.get(i) == 0) ? itemType.getSubFieldType(partitioningKeys.get(i))
+                            : metaType.getSubFieldType(partitioningKeys.get(i));
+            primaryRecFields[i] = serdeProvider.getSerializerDeserializer(keyType);
+            primaryComparatorFactories[i] =
+                    BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true);
+            primaryTypeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+            primaryBloomFilterKeyFields[i] = i;
+        }
+        primaryRecFields[numPrimaryKeys] = payloadSerde;
+        primaryTypeTraits[numPrimaryKeys] = TypeTraitProvider.INSTANCE.getTypeTrait(itemType);
+        if (dataset.hasMetaPart()) {
+            primaryRecFields[numPrimaryKeys + 1] = payloadSerde;
+            primaryTypeTraits[numPrimaryKeys + 1] = TypeTraitProvider.INSTANCE.getTypeTrait(itemType);
+        }
+        primaryRecDesc = new RecordDescriptor(primaryRecFields, primaryTypeTraits);
+    }
+
+    protected abstract void setSecondaryRecDescAndComparators() throws AlgebricksException;
+
+    protected AbstractOperatorDescriptor createDummyKeyProviderOp(JobSpecification spec) throws AlgebricksException {
+        // Build dummy tuple containing one field with a dummy value inside.
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
+        DataOutput dos = tb.getDataOutput();
+        tb.reset();
+        try {
+            // Serialize dummy value into a field.
+            IntegerSerializerDeserializer.INSTANCE.serialize(0, dos);
+        } catch (HyracksDataException e) {
+            throw new AsterixException(e);
+        }
+        // Add dummy field.
+        tb.addFieldEndOffset();
+        ISerializerDeserializer[] keyRecDescSers = { IntegerSerializerDeserializer.INSTANCE };
+        RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+        ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
+                keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, keyProviderOp,
+                primaryPartitionConstraint);
+        return keyProviderOp;
+    }
+
+    protected BTreeSearchOperatorDescriptor createPrimaryIndexScanOp(JobSpecification spec)
+            throws AlgebricksException {
+        // -Infinity
+        int[] lowKeyFields = null;
+        // +Infinity
+        int[] highKeyFields = null;
+        ITransactionSubsystemProvider txnSubsystemProvider = TransactionSubsystemProvider.INSTANCE;
+        JobId jobId = JobIdFactory.generateJobId();
+        metadataProvider.setJobId(jobId);
+        boolean isWriteTransaction = metadataProvider.isWriteTransaction();
+        IJobletEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(jobId, isWriteTransaction);
+        spec.setJobletEventListenerFactory(jobEventListenerFactory);
+        Index primaryIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
+                dataset.getDataverseName(), dataset.getDatasetName(), dataset.getDatasetName());
+
+        boolean temp = dataset.getDatasetDetails().isTemp();
+        ISearchOperationCallbackFactory searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
+                : new PrimaryIndexInstantSearchOperationCallbackFactory(jobId, dataset.getDatasetId(),
+                        primaryBloomFilterKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE);
+        BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
+                RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER,
+                primaryFileSplitProvider, primaryRecDesc.getTypeTraits(), primaryComparatorFactories,
+                primaryBloomFilterKeyFields, lowKeyFields, highKeyFields, true, true,
+                dataset.getIndexDataflowHelperFactory(metadataProvider, primaryIndex, itemType, metaType,
+                        mergePolicyFactory, mergePolicyFactoryProperties),
+                false, false, null, searchCallbackFactory, null, null,
+                metadataProvider.getStorageComponentProvider().getMetadataPageManagerFactory());
+
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp,
+                primaryPartitionConstraint);
+        return primarySearchOp;
+    }
+
+    protected AlgebricksMetaOperatorDescriptor createAssignOp(JobSpecification spec, int numSecondaryKeyFields,
+            RecordDescriptor secondaryRecDesc) throws AlgebricksException {
+        int[] outColumns = new int[numSecondaryKeyFields + numFilterFields];
+        int[] projectionList = new int[numSecondaryKeyFields + numPrimaryKeys + numFilterFields];
+        for (int i = 0; i < numSecondaryKeyFields + numFilterFields; i++) {
+            outColumns[i] = numPrimaryKeys + i;
+        }
+        int projCount = 0;
+        for (int i = 0; i < numSecondaryKeyFields; i++) {
+            projectionList[projCount++] = numPrimaryKeys + i;
+        }
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            projectionList[projCount++] = i;
+        }
+        if (numFilterFields > 0) {
+            projectionList[projCount] = numPrimaryKeys + numSecondaryKeyFields;
+        }
+
+        IScalarEvaluatorFactory[] sefs = new IScalarEvaluatorFactory[secondaryFieldAccessEvalFactories.length];
+        for (int i = 0; i < secondaryFieldAccessEvalFactories.length; ++i) {
+            sefs[i] = secondaryFieldAccessEvalFactories[i];
+        }
+        AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, sefs, projectionList);
+        AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
+                new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
+                primaryPartitionConstraint);
+        return asterixAssignOp;
+    }
+
+    protected AlgebricksMetaOperatorDescriptor createCastOp(JobSpecification spec, DatasetType dsType) {
+        CastTypeDescriptor castFuncDesc = (CastTypeDescriptor) CastTypeDescriptor.FACTORY.createFunctionDescriptor();
+        castFuncDesc.setImmutableStates(enforcedItemType, itemType);
+
+        int[] outColumns = new int[1];
+        int[] projectionList = new int[(dataset.hasMetaPart() ? 2 : 1) + numPrimaryKeys];
+        int recordIdx;
+        //external datascan operator returns a record as the first field, instead of the last in internal case
+        if (dsType == DatasetType.EXTERNAL) {
+            recordIdx = 0;
+            outColumns[0] = 0;
+        } else {
+            recordIdx = numPrimaryKeys;
+            outColumns[0] = numPrimaryKeys;
+        }
+        for (int i = 0; i <= numPrimaryKeys; i++) {
+            projectionList[i] = i;
+        }
+        if (dataset.hasMetaPart()) {
+            projectionList[numPrimaryKeys + 1] = numPrimaryKeys + 1;
+        }
+        IScalarEvaluatorFactory[] castEvalFact =
+                new IScalarEvaluatorFactory[] { new ColumnAccessEvalFactory(recordIdx) };
+        IScalarEvaluatorFactory[] sefs = new IScalarEvaluatorFactory[1];
+        sefs[0] = castFuncDesc.createEvaluatorFactory(castEvalFact);
+        AssignRuntimeFactory castAssign = new AssignRuntimeFactory(outColumns, sefs, projectionList);
+        return new AlgebricksMetaOperatorDescriptor(spec, 1, 1, new IPushRuntimeFactory[] { castAssign },
+                new RecordDescriptor[] { enforcedRecDesc });
+    }
+
+    protected ExternalSortOperatorDescriptor createSortOp(JobSpecification spec,
+            IBinaryComparatorFactory[] secondaryComparatorFactories, RecordDescriptor secondaryRecDesc) {
+        int[] sortFields = new int[secondaryComparatorFactories.length];
+        for (int i = 0; i < secondaryComparatorFactories.length; i++) {
+            sortFields[i] = i;
+        }
+        ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(spec,
+                physOptConf.getMaxFramesExternalSort(), sortFields, secondaryComparatorFactories, secondaryRecDesc);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp, primaryPartitionConstraint);
+        return sortOp;
+    }
+
+    protected TreeIndexBulkLoadOperatorDescriptor createTreeIndexBulkLoadOp(JobSpecification spec,
+            int[] fieldPermutation, IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor)
+            throws AlgebricksException {
+        TreeIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(spec,
+                secondaryRecDesc, RuntimeComponentsProvider.RUNTIME_PROVIDER,
+                RuntimeComponentsProvider.RUNTIME_PROVIDER, secondaryFileSplitProvider,
+                secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories, secondaryBloomFilterKeyFields,
+                fieldPermutation, fillFactor, false, numElementsHint, false, dataflowHelperFactory,
+                metadataProvider.getStorageComponentProvider().getMetadataPageManagerFactory());
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp,
+                secondaryPartitionConstraint);
+        return treeIndexBulkLoadOp;
+    }
+
+    public AlgebricksMetaOperatorDescriptor createFilterNullsSelectOp(JobSpecification spec, int numSecondaryKeyFields,
+            RecordDescriptor secondaryRecDesc) throws AlgebricksException {
+        IScalarEvaluatorFactory[] andArgsEvalFactories = new IScalarEvaluatorFactory[numSecondaryKeyFields];
+        NotDescriptor notDesc = new NotDescriptor();
+        IsUnknownDescriptor isUnknownDesc = new IsUnknownDescriptor();
+        for (int i = 0; i < numSecondaryKeyFields; i++) {
+            // Access column i, and apply 'is not null'.
+            ColumnAccessEvalFactory columnAccessEvalFactory = new ColumnAccessEvalFactory(i);
+            IScalarEvaluatorFactory isUnknownEvalFactory =
+                    isUnknownDesc.createEvaluatorFactory(new IScalarEvaluatorFactory[] { columnAccessEvalFactory });
+            IScalarEvaluatorFactory notEvalFactory =
+                    notDesc.createEvaluatorFactory(new IScalarEvaluatorFactory[] { isUnknownEvalFactory });
+            andArgsEvalFactories[i] = notEvalFactory;
+        }
+        IScalarEvaluatorFactory selectCond;
+        if (numSecondaryKeyFields > 1) {
+            // Create conjunctive condition where all secondary index keys must
+            // satisfy 'is not null'.
+            AndDescriptor andDesc = new AndDescriptor();
+            selectCond = andDesc.createEvaluatorFactory(andArgsEvalFactories);
+        } else {
+            selectCond = andArgsEvalFactories[0];
+        }
+        StreamSelectRuntimeFactory select =
+                new StreamSelectRuntimeFactory(selectCond, null, BinaryBooleanInspector.FACTORY, false, -1, null);
+        AlgebricksMetaOperatorDescriptor asterixSelectOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
+                new IPushRuntimeFactory[] { select }, new RecordDescriptor[] { secondaryRecDesc });
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixSelectOp,
+                primaryPartitionConstraint);
+        return asterixSelectOp;
+    }
+
+    // This method creates a source indexing operator for external data
+    protected ExternalScanOperatorDescriptor createExternalIndexingOp(JobSpecification spec)
+            throws AlgebricksException {
+        // A record + primary keys
+        ISerializerDeserializer[] serdes = new ISerializerDeserializer[1 + numPrimaryKeys];
+        ITypeTraits[] typeTraits = new ITypeTraits[1 + numPrimaryKeys];
+        // payload serde and type traits for the record slot
+        serdes[0] = payloadSerde;
+        typeTraits[0] = TypeTraitProvider.INSTANCE.getTypeTrait(itemType);
+        //  serdes and type traits for rid fields
+        for (int i = 1; i < serdes.length; i++) {
+            serdes[i] = IndexingConstants.getSerializerDeserializer(i - 1);
+            typeTraits[i] = IndexingConstants.getTypeTraits(i - 1);
+        }
+        // output record desc
+        RecordDescriptor indexerDesc = new RecordDescriptor(serdes, typeTraits);
+
+        // Create the operator and its partition constraits
+        Pair<ExternalScanOperatorDescriptor, AlgebricksPartitionConstraint> indexingOpAndConstraints;
+        try {
+            indexingOpAndConstraints = ExternalIndexingOperations.createExternalIndexingOp(spec, metadataProvider,
+                    dataset, itemType, indexerDesc, externalFiles);
+        } catch (Exception e) {
+            throw new AlgebricksException(e);
+        }
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexingOpAndConstraints.first,
+                indexingOpAndConstraints.second);
+
+        // Set the primary partition constraints to this partition constraints
+        primaryPartitionConstraint = indexingOpAndConstraints.second;
+        return indexingOpAndConstraints.first;
+    }
+
+    protected AlgebricksMetaOperatorDescriptor createExternalAssignOp(JobSpecification spec, int numSecondaryKeys,
+            RecordDescriptor secondaryRecDesc) throws AlgebricksException {
+        int[] outColumns = new int[numSecondaryKeys];
+        int[] projectionList = new int[numSecondaryKeys + numPrimaryKeys];
+        for (int i = 0; i < numSecondaryKeys; i++) {
+            outColumns[i] = i + numPrimaryKeys + 1;
+            projectionList[i] = i + numPrimaryKeys + 1;
+        }
+
+        IScalarEvaluatorFactory[] sefs = new IScalarEvaluatorFactory[secondaryFieldAccessEvalFactories.length];
+        for (int i = 0; i < secondaryFieldAccessEvalFactories.length; ++i) {
+            sefs[i] = secondaryFieldAccessEvalFactories[i];
+        }
+        //add External RIDs to the projection list
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            projectionList[numSecondaryKeys + i] = i + 1;
+        }
+
+        AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, sefs, projectionList);
+        return new AlgebricksMetaOperatorDescriptor(spec, 1, 1, new IPushRuntimeFactory[] { assign },
+                new RecordDescriptor[] { secondaryRecDesc });
+    }
+
+    protected ExternalIndexBulkModifyOperatorDescriptor createExternalIndexBulkModifyOp(JobSpecification spec,
+            int[] fieldPermutation, IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor)
+            throws AlgebricksException {
+        // create a list of file ids
+        int numOfDeletedFiles = 0;
+        for (ExternalFile file : externalFiles) {
+            if (file.getPendingOp() == ExternalFilePendingOp.DROP_OP) {
+                numOfDeletedFiles++;
+            }
+        }
+        int[] deletedFiles = new int[numOfDeletedFiles];
+        int i = 0;
+        for (ExternalFile file : externalFiles) {
+            if (file.getPendingOp() == ExternalFilePendingOp.DROP_OP) {
+                deletedFiles[i] = file.getFileNumber();
+            }
+        }
+        ExternalIndexBulkModifyOperatorDescriptor treeIndexBulkLoadOp = new ExternalIndexBulkModifyOperatorDescriptor(
+                spec, RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER,
+                secondaryFileSplitProvider, secondaryTypeTraits, secondaryComparatorFactories,
+                secondaryBloomFilterKeyFields, dataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE,
+                deletedFiles, fieldPermutation, fillFactor, numElementsHint,
+                metadataProvider.getStorageComponentProvider().getMetadataPageManagerFactory());
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp,
+                secondaryPartitionConstraint);
+        return treeIndexBulkLoadOp;
+    }
+
+    public List<ExternalFile> getExternalFiles() {
+        return externalFiles;
+    }
+
+    public void setExternalFiles(List<ExternalFile> externalFiles) {
+        this.externalFiles = externalFiles;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
new file mode 100644
index 0000000..b86004a
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils;
+
+import org.apache.asterix.common.config.DatasetConfig.IndexType;
+import org.apache.asterix.common.config.IPropertiesProvider;
+import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.transactions.IResourceFactory;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.utils.NonTaggedFormatUtil;
+import org.apache.asterix.om.utils.RecordUtil;
+import org.apache.asterix.runtime.formats.FormatUtils;
+import org.apache.asterix.runtime.utils.RuntimeUtils;
+import org.apache.asterix.transaction.management.resource.LSMInvertedIndexLocalResourceMetadataFactory;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy;
+import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
+import org.apache.hyracks.algebricks.data.ITypeTraitProvider;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import org.apache.hyracks.data.std.primitive.ShortPointable;
+import org.apache.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
+import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
+import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexBulkLoadOperatorDescriptor;
+import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexCompactOperator;
+import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexCreateOperatorDescriptor;
+import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
+import org.apache.hyracks.storage.common.file.ILocalResourceFactoryProvider;
+import org.apache.hyracks.storage.common.file.LocalResource;
+
+public class SecondaryInvertedIndexOperationsHelper extends SecondaryIndexOperationsHelper {
+
+    private IAType secondaryKeyType;
+    private ITypeTraits[] invListsTypeTraits;
+    private IBinaryComparatorFactory[] tokenComparatorFactories;
+    private ITypeTraits[] tokenTypeTraits;
+    private IBinaryTokenizerFactory tokenizerFactory;
+    // For tokenization, sorting and loading. Represents <token, primary keys>.
+    private int numTokenKeyPairFields;
+    private IBinaryComparatorFactory[] tokenKeyPairComparatorFactories;
+    private RecordDescriptor tokenKeyPairRecDesc;
+    private boolean isPartitioned;
+    private int[] invertedIndexFields;
+    private int[] invertedIndexFieldsForNonBulkLoadOps;
+    private int[] secondaryFilterFieldsForNonBulkLoadOps;
+
+    protected SecondaryInvertedIndexOperationsHelper(Dataset dataset, Index index,
+            PhysicalOptimizationConfig physOptConf, IPropertiesProvider propertiesProvider,
+            MetadataProvider metadataProvider, ARecordType recType, ARecordType metaType, ARecordType enforcedType,
+            ARecordType enforcedMetaType) {
+        super(dataset, index, physOptConf, propertiesProvider, metadataProvider, recType, metaType, enforcedType,
+                enforcedMetaType);
+    }
+
+    @Override
+    @SuppressWarnings("rawtypes")
+    protected void setSecondaryRecDescAndComparators() throws AlgebricksException {
+        int numSecondaryKeys = index.getKeyFieldNames().size();
+        IndexType indexType = index.getIndexType();
+        boolean isEnforcingKeyTypes = index.isEnforcingKeyFileds();
+        // Sanity checks.
+        if (numPrimaryKeys > 1) {
+            throw new CompilationException(
+                    ErrorCode.COMPILATION_ILLEGAL_INDEX_FOR_DATASET_WITH_COMPOSITE_PRIMARY_INDEX, indexType,
+                    RecordUtil.toFullyQualifiedName(dataset.getDataverseName(), dataset.getDatasetName()));
+        }
+        if (numSecondaryKeys > 1) {
+            throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_INDEX_NUM_OF_FIELD, numSecondaryKeys,
+                    indexType, 1);
+        }
+        if (indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX
+                || indexType == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) {
+            isPartitioned = true;
+        } else {
+            isPartitioned = false;
+        }
+        // Prepare record descriptor used in the assign op, and the optional
+        // select op.
+        secondaryFieldAccessEvalFactories = new IScalarEvaluatorFactory[numSecondaryKeys + numFilterFields];
+        ISerializerDeserializer[] secondaryRecFields =
+                new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys + numFilterFields];
+        ISerializerDeserializer[] enforcedRecFields =
+                new ISerializerDeserializer[1 + numPrimaryKeys + numFilterFields];
+        secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys];
+        ITypeTraits[] enforcedTypeTraits = new ITypeTraits[1 + numPrimaryKeys];
+        ISerializerDeserializerProvider serdeProvider = FormatUtils.getDefaultFormat().getSerdeProvider();
+        ITypeTraitProvider typeTraitProvider = FormatUtils.getDefaultFormat().getTypeTraitProvider();
+        if (numSecondaryKeys > 0) {
+            secondaryFieldAccessEvalFactories[0] = FormatUtils.getDefaultFormat().getFieldAccessEvaluatorFactory(
+                    isEnforcingKeyTypes ? enforcedItemType : itemType, index.getKeyFieldNames().get(0),
+                    numPrimaryKeys);
+            Pair<IAType, Boolean> keyTypePair = Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(0),
+                    index.getKeyFieldNames().get(0), itemType);
+            secondaryKeyType = keyTypePair.first;
+            anySecondaryKeyIsNullable = anySecondaryKeyIsNullable || keyTypePair.second;
+            ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(secondaryKeyType);
+            secondaryRecFields[0] = keySerde;
+            secondaryTypeTraits[0] = typeTraitProvider.getTypeTrait(secondaryKeyType);
+        }
+        if (numFilterFields > 0) {
+            secondaryFieldAccessEvalFactories[numSecondaryKeys] = FormatUtils.getDefaultFormat()
+                    .getFieldAccessEvaluatorFactory(itemType, filterFieldName, numPrimaryKeys);
+            Pair<IAType, Boolean> keyTypePair = Index.getNonNullableKeyFieldType(filterFieldName, itemType);
+            IAType type = keyTypePair.first;
+            ISerializerDeserializer serde = serdeProvider.getSerializerDeserializer(type);
+            secondaryRecFields[numPrimaryKeys + numSecondaryKeys] = serde;
+        }
+        secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
+        // Comparators and type traits for tokens.
+        int numTokenFields = (!isPartitioned) ? numSecondaryKeys : numSecondaryKeys + 1;
+        tokenComparatorFactories = new IBinaryComparatorFactory[numTokenFields];
+        tokenTypeTraits = new ITypeTraits[numTokenFields];
+        tokenComparatorFactories[0] = NonTaggedFormatUtil.getTokenBinaryComparatorFactory(secondaryKeyType);
+        tokenTypeTraits[0] = NonTaggedFormatUtil.getTokenTypeTrait(secondaryKeyType);
+        if (isPartitioned) {
+            // The partitioning field is hardcoded to be a short *without* an Asterix type tag.
+            tokenComparatorFactories[1] = PointableBinaryComparatorFactory.of(ShortPointable.FACTORY);
+            tokenTypeTraits[1] = ShortPointable.TYPE_TRAITS;
+        }
+        // Set tokenizer factory.
+        // TODO: We might want to expose the hashing option at the AQL level,
+        // and add the choice to the index metadata.
+        tokenizerFactory = NonTaggedFormatUtil.getBinaryTokenizerFactory(secondaryKeyType.getTypeTag(), indexType,
+                index.getGramLength());
+        // Type traits for inverted-list elements. Inverted lists contain
+        // primary keys.
+        invListsTypeTraits = new ITypeTraits[numPrimaryKeys];
+        if (numPrimaryKeys > 0) {
+            invListsTypeTraits[0] = primaryRecDesc.getTypeTraits()[0];
+            enforcedRecFields[0] = primaryRecDesc.getFields()[0];
+            enforcedTypeTraits[0] = primaryRecDesc.getTypeTraits()[0];
+        }
+        enforcedRecFields[numPrimaryKeys] = serdeProvider.getSerializerDeserializer(itemType);
+        enforcedRecDesc = new RecordDescriptor(enforcedRecFields, enforcedTypeTraits);
+        // For tokenization, sorting and loading.
+        // One token (+ optional partitioning field) + primary keys.
+        numTokenKeyPairFields = (!isPartitioned) ? 1 + numPrimaryKeys : 2 + numPrimaryKeys;
+        ISerializerDeserializer[] tokenKeyPairFields =
+                new ISerializerDeserializer[numTokenKeyPairFields + numFilterFields];
+        ITypeTraits[] tokenKeyPairTypeTraits = new ITypeTraits[numTokenKeyPairFields];
+        tokenKeyPairComparatorFactories = new IBinaryComparatorFactory[numTokenKeyPairFields];
+        tokenKeyPairFields[0] = serdeProvider.getSerializerDeserializer(secondaryKeyType);
+        tokenKeyPairTypeTraits[0] = tokenTypeTraits[0];
+        tokenKeyPairComparatorFactories[0] = NonTaggedFormatUtil.getTokenBinaryComparatorFactory(secondaryKeyType);
+        int pkOff = 1;
+        if (isPartitioned) {
+            tokenKeyPairFields[1] = ShortSerializerDeserializer.INSTANCE;
+            tokenKeyPairTypeTraits[1] = tokenTypeTraits[1];
+            tokenKeyPairComparatorFactories[1] = PointableBinaryComparatorFactory.of(ShortPointable.FACTORY);
+            pkOff = 2;
+        }
+        if (numPrimaryKeys > 0) {
+            tokenKeyPairFields[pkOff] = primaryRecDesc.getFields()[0];
+            tokenKeyPairTypeTraits[pkOff] = primaryRecDesc.getTypeTraits()[0];
+            tokenKeyPairComparatorFactories[pkOff] = primaryComparatorFactories[0];
+        }
+        if (numFilterFields > 0) {
+            tokenKeyPairFields[numPrimaryKeys + pkOff] = secondaryRecFields[numPrimaryKeys + numSecondaryKeys];
+        }
+        tokenKeyPairRecDesc = new RecordDescriptor(tokenKeyPairFields, tokenKeyPairTypeTraits);
+        if (filterFieldName != null) {
+            invertedIndexFields = new int[numTokenKeyPairFields];
+            for (int i = 0; i < invertedIndexFields.length; i++) {
+                invertedIndexFields[i] = i;
+            }
+            secondaryFilterFieldsForNonBulkLoadOps = new int[numFilterFields];
+            secondaryFilterFieldsForNonBulkLoadOps[0] = numSecondaryKeys + numPrimaryKeys;
+            invertedIndexFieldsForNonBulkLoadOps = new int[numSecondaryKeys + numPrimaryKeys];
+            for (int i = 0; i < invertedIndexFieldsForNonBulkLoadOps.length; i++) {
+                invertedIndexFieldsForNonBulkLoadOps[i] = i;
+            }
+        }
+    }
+
+    @Override
+    protected int getNumSecondaryKeys() {
+        return numTokenKeyPairFields - numPrimaryKeys;
+    }
+
+    @Override
+    public JobSpecification buildCreationJobSpec() throws AlgebricksException {
+        JobSpecification spec = RuntimeUtils.createJobSpecification();
+        IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider();
+        //prepare a LocalResourceMetadata which will be stored in NC's local resource repository
+        IResourceFactory localResourceMetadata = new LSMInvertedIndexLocalResourceMetadataFactory(invListsTypeTraits,
+                primaryComparatorFactories, tokenTypeTraits, tokenComparatorFactories, tokenizerFactory, isPartitioned,
+                dataset.getDatasetId(), mergePolicyFactory, mergePolicyFactoryProperties, filterTypeTraits,
+                filterCmpFactories, invertedIndexFields, secondaryFilterFields, secondaryFilterFieldsForNonBulkLoadOps,
+                invertedIndexFieldsForNonBulkLoadOps, dataset.getIndexOperationTrackerFactory(index),
+                dataset.getIoOperationCallbackFactory(index),
+                storageComponentProvider.getMetadataPageManagerFactory());
+        ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
+                localResourceMetadata, LocalResource.LSMInvertedIndexResource);
+
+        IIndexDataflowHelperFactory dataflowHelperFactory = createDataflowHelperFactory();
+        LSMInvertedIndexCreateOperatorDescriptor invIndexCreateOp =
+                new LSMInvertedIndexCreateOperatorDescriptor(spec, storageComponentProvider.getStorageManager(),
+                        secondaryFileSplitProvider, storageComponentProvider.getIndexLifecycleManagerProvider(),
+                        tokenTypeTraits, tokenComparatorFactories, invListsTypeTraits, primaryComparatorFactories,
+                        tokenizerFactory, dataflowHelperFactory, localResourceFactoryProvider,
+                        dataset.getModificationCallbackFactory(storageComponentProvider, index, null,
+                                IndexOperation.CREATE, null),
+                        storageComponentProvider.getMetadataPageManagerFactory());
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, invIndexCreateOp,
+                secondaryPartitionConstraint);
+        spec.addRoot(invIndexCreateOp);
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    @Override
+    public JobSpecification buildLoadingJobSpec() throws AlgebricksException {
+        JobSpecification spec = RuntimeUtils.createJobSpecification();
+
+        // Create dummy key provider for feeding the primary index scan.
+        AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec);
+
+        // Create primary index scan op.
+        BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(spec);
+
+        AbstractOperatorDescriptor sourceOp = primaryScanOp;
+        boolean isEnforcingKeyTypes = index.isEnforcingKeyFileds();
+        int numSecondaryKeys = index.getKeyFieldNames().size();
+        if (isEnforcingKeyTypes && !enforcedItemType.equals(itemType)) {
+            sourceOp = createCastOp(spec, dataset.getDatasetType());
+            spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, sourceOp, 0);
+        }
+        AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec, numSecondaryKeys, secondaryRecDesc);
+
+        // If any of the secondary fields are nullable, then add a select op
+        // that filters nulls.
+        AlgebricksMetaOperatorDescriptor selectOp = null;
+        if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) {
+            selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys, secondaryRecDesc);
+        }
+
+        // Create a tokenizer op.
+        AbstractOperatorDescriptor tokenizerOp = createTokenizerOp(spec);
+
+        // Sort by token + primary keys.
+        ExternalSortOperatorDescriptor sortOp =
+                createSortOp(spec, tokenKeyPairComparatorFactories, tokenKeyPairRecDesc);
+
+        // Create secondary inverted index bulk load op.
+        LSMInvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = createInvertedIndexBulkLoadOp(spec);
+
+        AlgebricksMetaOperatorDescriptor metaOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0,
+                new IPushRuntimeFactory[] { new SinkRuntimeFactory() }, new RecordDescriptor[] {});
+        // Connect the operators.
+        spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, asterixAssignOp, 0);
+        if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) {
+            spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0);
+            spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 0, tokenizerOp, 0);
+        } else {
+            spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, tokenizerOp, 0);
+        }
+        spec.connect(new OneToOneConnectorDescriptor(spec), tokenizerOp, 0, sortOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, invIndexBulkLoadOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), invIndexBulkLoadOp, 0, metaOp, 0);
+        spec.addRoot(metaOp);
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    private AbstractOperatorDescriptor createTokenizerOp(JobSpecification spec) throws AlgebricksException {
+        int docField = 0;
+        int numSecondaryKeys = index.getKeyFieldNames().size();
+        int[] primaryKeyFields = new int[numPrimaryKeys + numFilterFields];
+        for (int i = 0; i < primaryKeyFields.length; i++) {
+            primaryKeyFields[i] = numSecondaryKeys + i;
+        }
+        BinaryTokenizerOperatorDescriptor tokenizerOp = new BinaryTokenizerOperatorDescriptor(spec,
+                tokenKeyPairRecDesc, tokenizerFactory, docField, primaryKeyFields, isPartitioned, false);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, tokenizerOp,
+                primaryPartitionConstraint);
+        return tokenizerOp;
+    }
+
+    @Override
+    protected ExternalSortOperatorDescriptor createSortOp(JobSpecification spec,
+            IBinaryComparatorFactory[] secondaryComparatorFactories, RecordDescriptor secondaryRecDesc) {
+        // Sort on token and primary keys.
+        int[] sortFields = new int[numTokenKeyPairFields];
+        for (int i = 0; i < numTokenKeyPairFields; i++) {
+            sortFields[i] = i;
+        }
+        ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(spec,
+                physOptConf.getMaxFramesExternalSort(), sortFields, tokenKeyPairComparatorFactories, secondaryRecDesc);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp, primaryPartitionConstraint);
+        return sortOp;
+    }
+
+    private LSMInvertedIndexBulkLoadOperatorDescriptor createInvertedIndexBulkLoadOp(JobSpecification spec)
+            throws AlgebricksException {
+        int[] fieldPermutation = new int[numTokenKeyPairFields + numFilterFields];
+        for (int i = 0; i < fieldPermutation.length; i++) {
+            fieldPermutation[i] = i;
+        }
+        IIndexDataflowHelperFactory dataflowHelperFactory = createDataflowHelperFactory();
+        IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider();
+        LSMInvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = new LSMInvertedIndexBulkLoadOperatorDescriptor(
+                spec, secondaryRecDesc, fieldPermutation, false, numElementsHint, false,
+                storageComponentProvider.getStorageManager(), secondaryFileSplitProvider,
+                storageComponentProvider.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
+                invListsTypeTraits, primaryComparatorFactories, tokenizerFactory, dataflowHelperFactory,
+                storageComponentProvider.getMetadataPageManagerFactory());
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, invIndexBulkLoadOp,
+                secondaryPartitionConstraint);
+        return invIndexBulkLoadOp;
+    }
+
+    private IIndexDataflowHelperFactory createDataflowHelperFactory() throws AlgebricksException {
+        return dataset.getIndexDataflowHelperFactory(metadataProvider, index, itemType, metaType, mergePolicyFactory,
+                mergePolicyFactoryProperties);
+    }
+
+    @Override
+    public JobSpecification buildCompactJobSpec() throws AsterixException, AlgebricksException {
+        JobSpecification spec = RuntimeUtils.createJobSpecification();
+        IIndexDataflowHelperFactory dataflowHelperFactory = createDataflowHelperFactory();
+        IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider();
+        LSMInvertedIndexCompactOperator compactOp =
+                new LSMInvertedIndexCompactOperator(spec, storageComponentProvider.getStorageManager(),
+                        secondaryFileSplitProvider, storageComponentProvider.getIndexLifecycleManagerProvider(),
+                        tokenTypeTraits, tokenComparatorFactories, invListsTypeTraits, primaryComparatorFactories,
+                        tokenizerFactory, dataflowHelperFactory,
+                        dataset.getModificationCallbackFactory(storageComponentProvider, index, null,
+                                IndexOperation.FULL_MERGE, null),
+                        storageComponentProvider.getMetadataPageManagerFactory());
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp,
+                secondaryPartitionConstraint);
+
+        spec.addRoot(compactOp);
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java
new file mode 100644
index 0000000..460b635
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils;
+
+import java.util.List;
+
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.config.IPropertiesProvider;
+import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.transactions.IResourceFactory;
+import org.apache.asterix.external.indexing.IndexingConstants;
+import org.apache.asterix.external.operators.ExternalScanOperatorDescriptor;
+import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.formats.nontagged.TypeTraitProvider;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.utils.NonTaggedFormatUtil;
+import org.apache.asterix.runtime.utils.RuntimeUtils;
+import org.apache.asterix.transaction.management.resource.ExternalRTreeLocalResourceMetadataFactory;
+import org.apache.asterix.transaction.management.resource.LSMRTreeLocalResourceMetadataFactory;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy;
+import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
+import org.apache.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor;
+import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType;
+import org.apache.hyracks.storage.common.file.ILocalResourceFactoryProvider;
+import org.apache.hyracks.storage.common.file.LocalResource;
+
+@SuppressWarnings("rawtypes")
+public class SecondaryRTreeOperationsHelper extends SecondaryIndexOperationsHelper {
+
+    protected IPrimitiveValueProviderFactory[] valueProviderFactories;
+    protected int numNestedSecondaryKeyFields;
+    protected ATypeTag keyType;
+    protected int[] primaryKeyFields;
+    protected int[] rtreeFields;
+    protected boolean isPointMBR;
+    protected RecordDescriptor secondaryRecDescForPointMBR = null;
+
+    protected SecondaryRTreeOperationsHelper(Dataset dataset, Index index, PhysicalOptimizationConfig physOptConf,
+            IPropertiesProvider propertiesProvider, MetadataProvider metadataProvider, ARecordType recType,
+            ARecordType metaType, ARecordType enforcedType, ARecordType enforcedMetaType) {
+        super(dataset, index, physOptConf, propertiesProvider, metadataProvider, recType, metaType, enforcedType,
+                enforcedMetaType);
+    }
+
+    @Override
+    public JobSpecification buildCreationJobSpec() throws AlgebricksException {
+        JobSpecification spec = RuntimeUtils.createJobSpecification();
+        IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory(
+                metadataProvider, index, itemType, metaType, mergePolicyFactory, mergePolicyFactoryProperties);
+        IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider();
+        ILocalResourceFactoryProvider localResourceFactoryProvider;
+        if (dataset.getDatasetType() == DatasetType.INTERNAL) {
+            IBinaryComparatorFactory[] btreeCompFactories = getComparatorFactoriesForDeletedKeyBTree();
+            //prepare a LocalResourceMetadata which will be stored in NC's local resource repository
+            IResourceFactory localResourceMetadata = new LSMRTreeLocalResourceMetadataFactory(secondaryTypeTraits,
+                    secondaryComparatorFactories, btreeCompFactories, valueProviderFactories, RTreePolicyType.RTREE,
+                    MetadataProvider.proposeLinearizer(keyType, secondaryComparatorFactories.length),
+                    dataset.getDatasetId(), mergePolicyFactory, mergePolicyFactoryProperties, filterTypeTraits,
+                    filterCmpFactories, rtreeFields, primaryKeyFields, secondaryFilterFields, isPointMBR,
+                    dataset.getIndexOperationTrackerFactory(index), dataset.getIoOperationCallbackFactory(index),
+                    storageComponentProvider.getMetadataPageManagerFactory());
+            localResourceFactoryProvider =
+                    new PersistentLocalResourceFactoryProvider(localResourceMetadata, LocalResource.LSMRTreeResource);
+        } else {
+            // External dataset
+            // Prepare a LocalResourceMetadata which will be stored in NC's local resource repository
+            IResourceFactory localResourceMetadata = new ExternalRTreeLocalResourceMetadataFactory(secondaryTypeTraits,
+                    secondaryComparatorFactories, ExternalIndexingOperations.getBuddyBtreeComparatorFactories(),
+                    valueProviderFactories, RTreePolicyType.RTREE,
+                    MetadataProvider.proposeLinearizer(keyType, secondaryComparatorFactories.length),
+                    dataset.getDatasetId(), mergePolicyFactory, mergePolicyFactoryProperties, primaryKeyFields,
+                    isPointMBR, dataset.getIndexOperationTrackerFactory(index),
+                    dataset.getIoOperationCallbackFactory(index),
+                    storageComponentProvider.getMetadataPageManagerFactory());
+            localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(localResourceMetadata,
+                    LocalResource.ExternalRTreeResource);
+        }
+
+        TreeIndexCreateOperatorDescriptor secondaryIndexCreateOp =
+                new TreeIndexCreateOperatorDescriptor(spec, storageComponentProvider.getStorageManager(),
+                        storageComponentProvider.getIndexLifecycleManagerProvider(), secondaryFileSplitProvider,
+                        secondaryTypeTraits, secondaryComparatorFactories, null, indexDataflowHelperFactory,
+                        localResourceFactoryProvider,
+                        dataset.getModificationCallbackFactory(storageComponentProvider, index, null,
+                                IndexOperation.CREATE, null),
+                        storageComponentProvider.getMetadataPageManagerFactory());
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryIndexCreateOp,
+                secondaryPartitionConstraint);
+        spec.addRoot(secondaryIndexCreateOp);
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    private IBinaryComparatorFactory[] getComparatorFactoriesForDeletedKeyBTree() {
+        IBinaryComparatorFactory[] btreeCompFactories = new IBinaryComparatorFactory[secondaryTypeTraits.length];
+        int i = 0;
+        for (; i < secondaryComparatorFactories.length; i++) {
+            btreeCompFactories[i] = secondaryComparatorFactories[i];
+        }
+        for (int j = 0; i < secondaryTypeTraits.length; i++, j++) {
+            btreeCompFactories[i] = primaryComparatorFactories[j];
+        }
+        return btreeCompFactories;
+    }
+
+    @Override
+    protected int getNumSecondaryKeys() {
+        return numNestedSecondaryKeyFields;
+    }
+
+    @Override
+    protected void setSecondaryRecDescAndComparators() throws AlgebricksException {
+        List<List<String>> secondaryKeyFields = index.getKeyFieldNames();
+        int numSecondaryKeys = secondaryKeyFields.size();
+        boolean isEnforcingKeyTypes = index.isEnforcingKeyFileds();
+        if (numSecondaryKeys != 1) {
+            throw new AsterixException("Cannot use " + numSecondaryKeys + " fields as a key for the R-tree index. "
+                    + "There can be only one field as a key for the R-tree index.");
+        }
+        Pair<IAType, Boolean> spatialTypePair = Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(0),
+                secondaryKeyFields.get(0), itemType);
+        IAType spatialType = spatialTypePair.first;
+        anySecondaryKeyIsNullable = spatialTypePair.second;
+        if (spatialType == null) {
+            throw new AsterixException("Could not find field " + secondaryKeyFields.get(0) + " in the schema.");
+        }
+        isPointMBR = spatialType.getTypeTag() == ATypeTag.POINT || spatialType.getTypeTag() == ATypeTag.POINT3D;
+        int numDimensions = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
+        numNestedSecondaryKeyFields = numDimensions * 2;
+        int recordColumn = dataset.getDatasetType() == DatasetType.INTERNAL ? numPrimaryKeys : 0;
+        secondaryFieldAccessEvalFactories =
+                metadataProvider.getFormat().createMBRFactory(isEnforcingKeyTypes ? enforcedItemType : itemType,
+                        secondaryKeyFields.get(0), recordColumn, numDimensions, filterFieldName);
+        secondaryComparatorFactories = new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
+        valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
+        ISerializerDeserializer[] secondaryRecFields =
+                new ISerializerDeserializer[numPrimaryKeys + numNestedSecondaryKeyFields + numFilterFields];
+        ISerializerDeserializer[] enforcedRecFields =
+                new ISerializerDeserializer[1 + numPrimaryKeys + numFilterFields];
+        secondaryTypeTraits = new ITypeTraits[numNestedSecondaryKeyFields + numPrimaryKeys];
+        ITypeTraits[] enforcedTypeTraits = new ITypeTraits[1 + numPrimaryKeys];
+        IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
+        keyType = nestedKeyType.getTypeTag();
+        for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
+            ISerializerDeserializer keySerde =
+                    SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(nestedKeyType);
+            secondaryRecFields[i] = keySerde;
+            secondaryComparatorFactories[i] =
+                    BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(nestedKeyType, true);
+            secondaryTypeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
+            valueProviderFactories[i] =
+                    metadataProvider.getStorageComponentProvider().getPrimitiveValueProviderFactory();
+
+        }
+        // Add serializers and comparators for primary index fields.
+        if (dataset.getDatasetType() == DatasetType.INTERNAL) {
+            for (int i = 0; i < numPrimaryKeys; i++) {
+                secondaryRecFields[numNestedSecondaryKeyFields + i] = primaryRecDesc.getFields()[i];
+                secondaryTypeTraits[numNestedSecondaryKeyFields + i] = primaryRecDesc.getTypeTraits()[i];
+                enforcedRecFields[i] = primaryRecDesc.getFields()[i];
+                enforcedTypeTraits[i] = primaryRecDesc.getTypeTraits()[i];
+            }
+        } else {
+            for (int i = 0; i < numPrimaryKeys; i++) {
+                secondaryRecFields[numNestedSecondaryKeyFields + i] = IndexingConstants.getSerializerDeserializer(i);
+                secondaryTypeTraits[numNestedSecondaryKeyFields + i] = IndexingConstants.getTypeTraits(i);
+                enforcedRecFields[i] = IndexingConstants.getSerializerDeserializer(i);
+                enforcedTypeTraits[i] = IndexingConstants.getTypeTraits(i);
+            }
+        }
+        enforcedRecFields[numPrimaryKeys] =
+                SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
+        enforcedRecDesc = new RecordDescriptor(enforcedRecFields, enforcedTypeTraits);
+        if (numFilterFields > 0) {
+            rtreeFields = new int[numNestedSecondaryKeyFields + numPrimaryKeys];
+            for (int i = 0; i < rtreeFields.length; i++) {
+                rtreeFields[i] = i;
+            }
+
+            Pair<IAType, Boolean> typePair = Index.getNonNullableKeyFieldType(filterFieldName, itemType);
+            IAType type = typePair.first;
+            ISerializerDeserializer serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(type);
+            secondaryRecFields[numPrimaryKeys + numNestedSecondaryKeyFields] = serde;
+        }
+        secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
+        primaryKeyFields = new int[numPrimaryKeys];
+        for (int i = 0; i < primaryKeyFields.length; i++) {
+            primaryKeyFields[i] = i + numNestedSecondaryKeyFields;
+        }
+        if (isPointMBR) {
+            int numNestedSecondaryKeyFieldForPointMBR = numNestedSecondaryKeyFields / 2;
+            ISerializerDeserializer[] recFieldsForPointMBR = new ISerializerDeserializer[numPrimaryKeys
+                    + numNestedSecondaryKeyFieldForPointMBR + numFilterFields];
+            int idx = 0;
+            for (int i = 0; i < numNestedSecondaryKeyFieldForPointMBR; i++) {
+                recFieldsForPointMBR[idx++] = secondaryRecFields[i];
+            }
+            for (int i = 0; i < numPrimaryKeys + numFilterFields; i++) {
+                recFieldsForPointMBR[idx++] = secondaryRecFields[numNestedSecondaryKeyFields + i];
+            }
+            secondaryRecDescForPointMBR = new RecordDescriptor(recFieldsForPointMBR);
+        }
+    }
+
+    @Override
+    public JobSpecification buildLoadingJobSpec() throws AsterixException, AlgebricksException {
+        /***************************************************
+         * [ About PointMBR Optimization ]
+         * Instead of storing a MBR(4 doubles) for a point(2 doubles) in RTree leaf node,
+         * PointMBR concept is introduced.
+         * PointMBR is a way to store a point as 2 doubles in RTree leaf node.
+         * This reduces RTree index size roughly in half.
+         * In order to fully benefit from the PointMBR concept, besides RTree,
+         * external sort operator during bulk-loading (from either data loading or index creation)
+         * must deal with point as 2 doubles instead of 4 doubles. Otherwise, external sort will suffer from twice as
+         * many doubles as it actually requires. For this purpose,
+         * PointMBR specific optimization logic is added as follows:
+         * 1) CreateMBR function in assign operator generates 2 doubles, instead of 4 doubles.
+         * 2) External sort operator sorts points represented with 2 doubles.
+         * 3) Bulk-loading in RTree takes 4 doubles by reading 2 doubles twice and then,
+         * do the same work as non-point MBR cases.
+         ***************************************************/
+        JobSpecification spec = RuntimeUtils.createJobSpecification();
+        int[] fieldPermutation = createFieldPermutationForBulkLoadOp(numNestedSecondaryKeyFields);
+        int numNestedSecondaryKeFieldsConsideringPointMBR =
+                isPointMBR ? numNestedSecondaryKeyFields / 2 : numNestedSecondaryKeyFields;
+        RecordDescriptor secondaryRecDescConsideringPointMBR =
+                isPointMBR ? secondaryRecDescForPointMBR : secondaryRecDesc;
+        boolean isEnforcingKeyTypes = index.isEnforcingKeyFileds();
+        IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory(
+                metadataProvider, index, itemType, metaType, mergePolicyFactory, mergePolicyFactoryProperties);
+        if (dataset.getDatasetType() == DatasetType.INTERNAL) {
+            // Create dummy key provider for feeding the primary index scan.
+            AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec);
+
+            // Create primary index scan op.
+            BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(spec);
+
+            // Assign op.
+            AbstractOperatorDescriptor sourceOp = primaryScanOp;
+            if (isEnforcingKeyTypes && !enforcedItemType.equals(itemType)) {
+                sourceOp = createCastOp(spec, dataset.getDatasetType());
+                spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, sourceOp, 0);
+            }
+            AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec,
+                    numNestedSecondaryKeFieldsConsideringPointMBR, secondaryRecDescConsideringPointMBR);
+
+            // If any of the secondary fields are nullable, then add a select op that filters nulls.
+            AlgebricksMetaOperatorDescriptor selectOp = null;
+            if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) {
+                selectOp = createFilterNullsSelectOp(spec, numNestedSecondaryKeFieldsConsideringPointMBR,
+                        secondaryRecDescConsideringPointMBR);
+            }
+
+            // Sort by secondary keys.
+            ExternalSortOperatorDescriptor sortOp = createSortOp(spec,
+                    new IBinaryComparatorFactory[] {
+                            MetadataProvider.proposeLinearizer(keyType, secondaryComparatorFactories.length) },
+                    isPointMBR ? secondaryRecDescForPointMBR : secondaryRecDesc);
+            // Create secondary RTree bulk load op.
+            TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(spec, fieldPermutation,
+                    indexDataflowHelperFactory, GlobalConfig.DEFAULT_TREE_FILL_FACTOR);
+            AlgebricksMetaOperatorDescriptor metaOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0,
+                    new IPushRuntimeFactory[] { new SinkRuntimeFactory() }, new RecordDescriptor[] {});
+            // Connect the operators.
+            spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
+            spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, asterixAssignOp, 0);
+            if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) {
+                spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0);
+                spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 0, sortOp, 0);
+            } else {
+                spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, sortOp, 0);
+            }
+            spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0);
+            spec.connect(new OneToOneConnectorDescriptor(spec), secondaryBulkLoadOp, 0, metaOp, 0);
+            spec.addRoot(metaOp);
+            spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        } else {
+            // External dataset
+            /*
+             * In case of external data, this method is used to build loading jobs for both
+             * initial load on index creation
+             * and transaction load on dataset referesh
+             */
+            // Create external indexing scan operator
+            ExternalScanOperatorDescriptor primaryScanOp = createExternalIndexingOp(spec);
+            AbstractOperatorDescriptor sourceOp = primaryScanOp;
+            if (isEnforcingKeyTypes && !enforcedItemType.equals(itemType)) {
+                sourceOp = createCastOp(spec, dataset.getDatasetType());
+                spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, sourceOp, 0);
+            }
+            // Assign op.
+            AlgebricksMetaOperatorDescriptor asterixAssignOp = createExternalAssignOp(spec,
+                    numNestedSecondaryKeFieldsConsideringPointMBR, secondaryRecDescConsideringPointMBR);
+
+            // If any of the secondary fields are nullable, then add a select op that filters nulls.
+            AlgebricksMetaOperatorDescriptor selectOp = null;
+            if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) {
+                selectOp = createFilterNullsSelectOp(spec, numNestedSecondaryKeFieldsConsideringPointMBR,
+                        secondaryRecDescConsideringPointMBR);
+            }
+
+            // Sort by secondary keys.
+            ExternalSortOperatorDescriptor sortOp = createSortOp(spec,
+                    new IBinaryComparatorFactory[] {
+                            MetadataProvider.proposeLinearizer(keyType, secondaryComparatorFactories.length) },
+                    isPointMBR ? secondaryRecDescForPointMBR : secondaryRecDesc);
+            // Create secondary RTree bulk load op.
+            IOperatorDescriptor root;
+            AbstractTreeIndexOperatorDescriptor secondaryBulkLoadOp;
+            if (externalFiles != null) {
+                // Transaction load
+                secondaryBulkLoadOp = createExternalIndexBulkModifyOp(spec, fieldPermutation,
+                        indexDataflowHelperFactory, GlobalConfig.DEFAULT_TREE_FILL_FACTOR);
+                root = secondaryBulkLoadOp;
+            } else {
+                // Initial load
+                secondaryBulkLoadOp = createTreeIndexBulkLoadOp(spec, fieldPermutation, indexDataflowHelperFactory,
+                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR);
+                AlgebricksMetaOperatorDescriptor metaOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0,
+                        new IPushRuntimeFactory[] { new SinkRuntimeFactory() },
+                        new RecordDescriptor[] { secondaryRecDesc });
+                spec.connect(new OneToOneConnectorDescriptor(spec), secondaryBulkLoadOp, 0, metaOp, 0);
+                root = metaOp;
+            }
+
+            spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, asterixAssignOp, 0);
+            if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) {
+                spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0);
+                spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 0, sortOp, 0);
+            } else {
+                spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, sortOp, 0);
+            }
+            spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0);
+            spec.addRoot(root);
+            spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        }
+        return spec;
+    }
+
+    protected int[] createFieldPermutationForBulkLoadOp(int numSecondaryKeyFields) {
+        int[] fieldPermutation = new int[numSecondaryKeyFields + numPrimaryKeys + numFilterFields];
+        int numSecondaryKeyFieldsForPointMBR = numSecondaryKeyFields / 2;
+        int end = isPointMBR ? numSecondaryKeyFieldsForPointMBR : fieldPermutation.length;
+        for (int i = 0; i < end; i++) {
+            fieldPermutation[i] = i;
+        }
+        if (isPointMBR) {
+            /*******************************************************************************
+             * For example, suppose that 2d point type data is indexed using RTree, there is no
+             * filter fields, and a primary key consists of a single field.
+             * ========== Without PointMBR optimization ==========
+             * If there is no point type optimization, the input operator of RTree's TreeIndexBulkLoadOperator
+             * delivers five variables to the TreeIndexBulkLoadOperator as follows:
+             * [$var1, $var2, $var3, $var4, $var5]
+             * where $var1 ~ $var4 together represent an MBR of a point object.
+             * Since it is a point object, $var1 and $var3 have always identical values. So do $var2 and $var3.
+             * $var5 represents a primary key value.
+             * fieldPermutation variable captures this order correctly by putting values in the array as follows:
+             * [0,1,2,3,4]
+             * =========== With PointMBR optimization ===========
+             * With PointMBR optimization, the input operator of RTree's TreeIndexBulkLoadOperator
+             * delivers 3 variables to the TreeIndexBulkLoadOperator as follows:
+             * [$var1, $var2, $var3]
+             * where $var1 and $var2 together represent an MBR of a point object.
+             * $var3 represents a primary key value.
+             * fieldPermutation variable captures this order correctly by putting values in the array as follows:
+             * [0,1,0,1,2]
+             * This means that bulkloadOp reads the pair of $var1 and $var2 twice in order to provide the same
+             * output just like when there were no PointMBR optimization available.
+             * This adjustment is done in this if clause code.
+             *********************************************************************************/
+            int idx = numSecondaryKeyFieldsForPointMBR;
+            //add the rest of the sk fields for pointMBR
+            for (int i = 0; i < numSecondaryKeyFieldsForPointMBR; i++) {
+                fieldPermutation[idx++] = i;
+            }
+            //add the pk and filter fields
+            end = numSecondaryKeyFieldsForPointMBR + numPrimaryKeys + numFilterFields;
+            for (int i = numSecondaryKeyFieldsForPointMBR; i < end; i++) {
+                fieldPermutation[idx++] = i;
+            }
+        }
+        return fieldPermutation;
+    }
+
+    @Override
+    public JobSpecification buildCompactJobSpec() throws AsterixException, AlgebricksException {
+        JobSpecification spec = RuntimeUtils.createJobSpecification();
+        IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory(
+                metadataProvider, index, itemType, metaType, mergePolicyFactory, mergePolicyFactoryProperties);
+        LSMTreeIndexCompactOperatorDescriptor compactOp = new LSMTreeIndexCompactOperatorDescriptor(spec,
+                metadataProvider.getStorageComponentProvider().getStorageManager(),
+                metadataProvider.getStorageComponentProvider().getIndexLifecycleManagerProvider(),
+                secondaryFileSplitProvider, secondaryTypeTraits, secondaryComparatorFactories,
+                secondaryBloomFilterKeyFields, indexDataflowHelperFactory,
+                dataset.getModificationCallbackFactory(metadataProvider.getStorageComponentProvider(), index, null,
+                        IndexOperation.FULL_MERGE, null),
+                metadataProvider.getStorageComponentProvider().getMetadataPageManagerFactory());
+
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp,
+                secondaryPartitionConstraint);
+        spec.addRoot(compactOp);
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+}


Mime
View raw message