asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ima...@apache.org
Subject [06/51] [partial] incubator-asterixdb git commit: Change folder structure for Java repackage
Date Tue, 25 Aug 2015 16:43:54 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java b/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java
new file mode 100644
index 0000000..ffecc8e
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java
@@ -0,0 +1,363 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.file;
+
+import java.util.List;
+
+import edu.uci.ics.asterix.common.api.ILocalResourceMetadata;
+import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
+import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
+import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
+import edu.uci.ics.asterix.common.config.GlobalConfig;
+import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
+import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.entities.Index;
+import edu.uci.ics.asterix.metadata.external.IndexingConstants;
+import edu.uci.ics.asterix.metadata.feeds.ExternalDataScanOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.utils.ExternalDatasetsRegistry;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
+import edu.uci.ics.asterix.transaction.management.resource.ExternalBTreeWithBuddyLocalResourceMetadata;
+import edu.uci.ics.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata;
+import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
+import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.ISerializerDeserializerProvider;
+import edu.uci.ics.hyracks.algebricks.data.ITypeTraitProvider;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeWithBuddyDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.common.file.ILocalResourceFactoryProvider;
+import edu.uci.ics.hyracks.storage.common.file.LocalResource;
+
+public class SecondaryBTreeOperationsHelper extends SecondaryIndexOperationsHelper {
+
+    protected SecondaryBTreeOperationsHelper(PhysicalOptimizationConfig physOptConf,
+            IAsterixPropertiesProvider propertiesProvider) {
+        super(physOptConf, propertiesProvider);
+    }
+
+    @Override
+    public JobSpecification buildCreationJobSpec() throws AsterixException, AlgebricksException {
+        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+
+        AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
+        ILocalResourceFactoryProvider localResourceFactoryProvider;
+        IIndexDataflowHelperFactory indexDataflowHelperFactory;
+        if (dataset.getDatasetType() == DatasetType.INTERNAL) {
+            //prepare a LocalResourceMetadata which will be stored in NC's local resource repository
+            ILocalResourceMetadata localResourceMetadata = new LSMBTreeLocalResourceMetadata(secondaryTypeTraits,
+                    secondaryComparatorFactories, secondaryBloomFilterKeyFields, true, dataset.getDatasetId(),
+                    mergePolicyFactory, mergePolicyFactoryProperties, filterTypeTraits, filterCmpFactories,
+                    secondaryBTreeFields, secondaryFilterFields);
+            localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(localResourceMetadata,
+                    LocalResource.LSMBTreeResource);
+            // The index create operation should be persistent regardless of temp datasets or permanent dataset.
+            indexDataflowHelperFactory = new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
+                    dataset.getDatasetId()), mergePolicyFactory, mergePolicyFactoryProperties,
+                    new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                    AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                    storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits, filterCmpFactories,
+                    secondaryBTreeFields, secondaryFilterFields, true);
+        } else {
+            // External dataset local resource and dataflow helper
+            int[] buddyBreeFields = new int[] { numSecondaryKeys };
+            ILocalResourceMetadata localResourceMetadata = new ExternalBTreeWithBuddyLocalResourceMetadata(
+                    dataset.getDatasetId(), secondaryComparatorFactories, secondaryTypeTraits, mergePolicyFactory,
+                    mergePolicyFactoryProperties, buddyBreeFields);
+            localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(localResourceMetadata,
+                    LocalResource.ExternalBTreeWithBuddyResource);
+            indexDataflowHelperFactory = new ExternalBTreeWithBuddyDataflowHelperFactory(mergePolicyFactory,
+                    mergePolicyFactoryProperties, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                    AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                    LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE,
+                    storageProperties.getBloomFilterFalsePositiveRate(), buddyBreeFields,
+                    ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(dataset), true);
+        }
+        TreeIndexCreateOperatorDescriptor secondaryIndexCreateOp = new TreeIndexCreateOperatorDescriptor(spec,
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                secondaryFileSplitProvider, secondaryTypeTraits, secondaryComparatorFactories,
+                secondaryBloomFilterKeyFields, indexDataflowHelperFactory, localResourceFactoryProvider,
+                NoOpOperationCallbackFactory.INSTANCE);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryIndexCreateOp,
+                secondaryPartitionConstraint);
+        spec.addRoot(secondaryIndexCreateOp);
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    @Override
+    public JobSpecification buildLoadingJobSpec() throws AsterixException, AlgebricksException {
+        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+
+        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+            /*
+             * In case of external data, this method is used to build loading jobs for both initial load on index creation
+             * and transaction load on dataset referesh
+             */
+
+            // Create external indexing scan operator
+            ExternalDataScanOperatorDescriptor primaryScanOp = createExternalIndexingOp(spec);
+
+            // Assign op.
+            AbstractOperatorDescriptor sourceOp = primaryScanOp;
+            if (isEnforcingKeyTypes) {
+                sourceOp = createCastOp(spec, primaryScanOp, numSecondaryKeys, dataset.getDatasetType());
+                spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, sourceOp, 0);
+            }
+            AlgebricksMetaOperatorDescriptor asterixAssignOp = createExternalAssignOp(spec, numSecondaryKeys);
+
+            // If any of the secondary fields are nullable, then add a select op that filters nulls.
+            AlgebricksMetaOperatorDescriptor selectOp = null;
+            if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) {
+                selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys);
+            }
+
+            // Sort by secondary keys.
+            ExternalSortOperatorDescriptor sortOp = createSortOp(spec, secondaryComparatorFactories, secondaryRecDesc);
+
+            AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
+            // Create secondary BTree bulk load op.
+            AbstractTreeIndexOperatorDescriptor secondaryBulkLoadOp;
+            ExternalBTreeWithBuddyDataflowHelperFactory dataflowHelperFactory = new ExternalBTreeWithBuddyDataflowHelperFactory(
+                    mergePolicyFactory, mergePolicyFactoryProperties, new SecondaryIndexOperationTrackerProvider(
+                            dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                    LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE,
+                    storageProperties.getBloomFilterFalsePositiveRate(), new int[] { numSecondaryKeys },
+                    ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(dataset), true);
+            IOperatorDescriptor root;
+            if (externalFiles != null) {
+                // Transaction load
+                secondaryBulkLoadOp = createExternalIndexBulkModifyOp(spec, numSecondaryKeys, dataflowHelperFactory,
+                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR);
+                root = secondaryBulkLoadOp;
+            } else {
+                // Initial load
+                secondaryBulkLoadOp = createTreeIndexBulkLoadOp(spec, numSecondaryKeys, dataflowHelperFactory,
+                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR);
+                AlgebricksMetaOperatorDescriptor metaOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0,
+                        new IPushRuntimeFactory[] { new SinkRuntimeFactory() },
+                        new RecordDescriptor[] { secondaryRecDesc });
+                spec.connect(new OneToOneConnectorDescriptor(spec), secondaryBulkLoadOp, 0, metaOp, 0);
+                root = metaOp;
+            }
+            spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, asterixAssignOp, 0);
+            if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) {
+                spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0);
+                spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 0, sortOp, 0);
+            } else {
+                spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, sortOp, 0);
+            }
+            spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0);
+            spec.addRoot(root);
+            spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+            return spec;
+        } else {
+            // Create dummy key provider for feeding the primary index scan. 
+            AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec);
+
+            // Create primary index scan op.
+            BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(spec);
+
+            // Assign op.
+            AbstractOperatorDescriptor sourceOp = primaryScanOp;
+            if (isEnforcingKeyTypes) {
+                sourceOp = createCastOp(spec, primaryScanOp, numSecondaryKeys, dataset.getDatasetType());
+                spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, sourceOp, 0);
+            }
+            AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec, sourceOp, numSecondaryKeys);
+
+            // If any of the secondary fields are nullable, then add a select op that filters nulls.
+            AlgebricksMetaOperatorDescriptor selectOp = null;
+            if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) {
+                selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys);
+            }
+
+            // Sort by secondary keys.
+            ExternalSortOperatorDescriptor sortOp = createSortOp(spec, secondaryComparatorFactories, secondaryRecDesc);
+
+            AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
+            boolean temp = dataset.getDatasetDetails().isTemp();
+            // Create secondary BTree bulk load op.
+            TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(
+                    spec,
+                    numSecondaryKeys,
+                    new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+                            mergePolicyFactory, mergePolicyFactoryProperties,
+                            new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties
+                                    .getBloomFilterFalsePositiveRate(), false, filterTypeTraits, filterCmpFactories,
+                            secondaryBTreeFields, secondaryFilterFields, !temp), GlobalConfig.DEFAULT_TREE_FILL_FACTOR);
+
+            AlgebricksMetaOperatorDescriptor metaOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0,
+                    new IPushRuntimeFactory[] { new SinkRuntimeFactory() }, new RecordDescriptor[] { secondaryRecDesc });
+            // Connect the operators.
+            spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
+            spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, asterixAssignOp, 0);
+            if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) {
+                spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0);
+                spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 0, sortOp, 0);
+            } else {
+                spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, sortOp, 0);
+            }
+            spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0);
+            spec.connect(new OneToOneConnectorDescriptor(spec), secondaryBulkLoadOp, 0, metaOp, 0);
+            spec.addRoot(metaOp);
+            spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+            return spec;
+        }
+    }
+
+    @Override
+    protected int getNumSecondaryKeys() {
+        return numSecondaryKeys;
+    }
+
+    @Override
+    public JobSpecification buildCompactJobSpec() throws AsterixException, AlgebricksException {
+        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+
+        AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
+        boolean temp = dataset.getDatasetDetails().isTemp();
+        LSMTreeIndexCompactOperatorDescriptor compactOp;
+        if (dataset.getDatasetType() == DatasetType.INTERNAL) {
+            compactOp = new LSMTreeIndexCompactOperatorDescriptor(spec,
+                    AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                    AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, secondaryFileSplitProvider, secondaryTypeTraits,
+                    secondaryComparatorFactories, secondaryBloomFilterKeyFields, new LSMBTreeDataflowHelperFactory(
+                            new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), mergePolicyFactory,
+                            mergePolicyFactoryProperties, new SecondaryIndexOperationTrackerProvider(
+                                    dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                            storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits,
+                            filterCmpFactories, secondaryBTreeFields, secondaryFilterFields, !temp),
+                    NoOpOperationCallbackFactory.INSTANCE);
+        } else {
+            // External dataset
+            compactOp = new LSMTreeIndexCompactOperatorDescriptor(spec,
+                    AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                    AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, secondaryFileSplitProvider, secondaryTypeTraits,
+                    secondaryComparatorFactories, secondaryBloomFilterKeyFields,
+                    new ExternalBTreeWithBuddyDataflowHelperFactory(mergePolicyFactory, mergePolicyFactoryProperties,
+                            new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE, storageProperties
+                                    .getBloomFilterFalsePositiveRate(), new int[] { numSecondaryKeys },
+                            ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(dataset), true),
+                    NoOpOperationCallbackFactory.INSTANCE);
+        }
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp,
+                secondaryPartitionConstraint);
+        spec.addRoot(compactOp);
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    @Override
+    @SuppressWarnings("rawtypes")
+    protected void setSecondaryRecDescAndComparators(IndexType indexType, List<List<String>> secondaryKeyFields,
+            List<IAType> secondaryKeyTypes, int gramLength, AqlMetadataProvider metadataProvider)
+            throws AlgebricksException, AsterixException {
+        secondaryFieldAccessEvalFactories = new ICopyEvaluatorFactory[numSecondaryKeys + numFilterFields];
+        secondaryComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys + numPrimaryKeys];
+        secondaryBloomFilterKeyFields = new int[numSecondaryKeys];
+        ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys
+                + numFilterFields];
+        ISerializerDeserializer[] enforcedRecFields = new ISerializerDeserializer[1 + numPrimaryKeys + numFilterFields];
+        secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys];
+        ITypeTraits[] enforcedTypeTraits = new ITypeTraits[1 + numPrimaryKeys];
+        ISerializerDeserializerProvider serdeProvider = metadataProvider.getFormat().getSerdeProvider();
+        ITypeTraitProvider typeTraitProvider = metadataProvider.getFormat().getTypeTraitProvider();
+        IBinaryComparatorFactoryProvider comparatorFactoryProvider = metadataProvider.getFormat()
+                .getBinaryComparatorFactoryProvider();
+        // Record column is 0 for external datasets, numPrimaryKeys for internal ones
+        int recordColumn = dataset.getDatasetType() == DatasetType.INTERNAL ? numPrimaryKeys : 0;
+        for (int i = 0; i < numSecondaryKeys; i++) {
+            secondaryFieldAccessEvalFactories[i] = metadataProvider.getFormat().getFieldAccessEvaluatorFactory(
+                    isEnforcingKeyTypes ? enforcedItemType : itemType, secondaryKeyFields.get(i), recordColumn);
+            Pair<IAType, Boolean> keyTypePair = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(i),
+                    secondaryKeyFields.get(i), itemType);
+            IAType keyType = keyTypePair.first;
+            anySecondaryKeyIsNullable = anySecondaryKeyIsNullable || keyTypePair.second;
+            ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
+            secondaryRecFields[i] = keySerde;
+            secondaryComparatorFactories[i] = comparatorFactoryProvider.getBinaryComparatorFactory(keyType, true);
+            secondaryTypeTraits[i] = typeTraitProvider.getTypeTrait(keyType);
+            secondaryBloomFilterKeyFields[i] = i;
+        }
+        if (dataset.getDatasetType() == DatasetType.INTERNAL) {
+            // Add serializers and comparators for primary index fields.
+            for (int i = 0; i < numPrimaryKeys; i++) {
+                secondaryRecFields[numSecondaryKeys + i] = primaryRecDesc.getFields()[i];
+                enforcedRecFields[i] = primaryRecDesc.getFields()[i];
+                secondaryTypeTraits[numSecondaryKeys + i] = primaryRecDesc.getTypeTraits()[i];
+                enforcedTypeTraits[i] = primaryRecDesc.getTypeTraits()[i];
+                secondaryComparatorFactories[numSecondaryKeys + i] = primaryComparatorFactories[i];
+            }
+        } else {
+            // Add serializers and comparators for RID fields.
+            for (int i = 0; i < numPrimaryKeys; i++) {
+                secondaryRecFields[numSecondaryKeys + i] = IndexingConstants.getSerializerDeserializer(i);
+                enforcedRecFields[i] = IndexingConstants.getSerializerDeserializer(i);
+                secondaryTypeTraits[numSecondaryKeys + i] = IndexingConstants.getTypeTraits(i);
+                enforcedTypeTraits[i] = IndexingConstants.getTypeTraits(i);
+                secondaryComparatorFactories[numSecondaryKeys + i] = IndexingConstants.getComparatorFactory(i);
+            }
+        }
+        enforcedRecFields[numPrimaryKeys] = serdeProvider.getSerializerDeserializer(itemType);
+
+        if (numFilterFields > 0) {
+            secondaryFieldAccessEvalFactories[numSecondaryKeys] = metadataProvider.getFormat()
+                    .getFieldAccessEvaluatorFactory(itemType, filterFieldName, numPrimaryKeys);
+            Pair<IAType, Boolean> keyTypePair = Index.getNonNullableKeyFieldType(filterFieldName, itemType);
+            IAType type = keyTypePair.first;
+            ISerializerDeserializer serde = serdeProvider.getSerializerDeserializer(type);
+            secondaryRecFields[numPrimaryKeys + numSecondaryKeys] = serde;
+        }
+
+        secondaryRecDesc = new RecordDescriptor(secondaryRecFields, secondaryTypeTraits);
+        enforcedRecDesc = new RecordDescriptor(enforcedRecFields, enforcedTypeTraits);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java b/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java
new file mode 100644
index 0000000..07c8bab
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java
@@ -0,0 +1,575 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.file;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
+import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
+import edu.uci.ics.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
+import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
+import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
+import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
+import edu.uci.ics.asterix.common.context.ITransactionSubsystemProvider;
+import edu.uci.ics.asterix.common.context.TransactionSubsystemProvider;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
+import edu.uci.ics.asterix.common.transactions.IRecoveryManager.ResourceType;
+import edu.uci.ics.asterix.common.transactions.JobId;
+import edu.uci.ics.asterix.external.indexing.operators.ExternalIndexBulkModifyOperatorDescriptor;
+import edu.uci.ics.asterix.formats.nontagged.AqlBinaryBooleanInspectorImpl;
+import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
+import edu.uci.ics.asterix.metadata.MetadataException;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.ExternalFile;
+import edu.uci.ics.asterix.metadata.external.IndexingConstants;
+import edu.uci.ics.asterix.metadata.feeds.ExternalDataScanOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
+import edu.uci.ics.asterix.runtime.evaluators.functions.AndDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.CastRecordDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.IsNullDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.NotDescriptor;
+import edu.uci.ics.asterix.runtime.job.listener.JobEventListenerFactory;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
+import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
+import edu.uci.ics.asterix.transaction.management.service.transaction.JobIdFactory;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.LogicalExpressionJobGenToExpressionRuntimeProviderAdapter;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import edu.uci.ics.hyracks.algebricks.data.ISerializerDeserializerProvider;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.evaluators.ColumnAccessEvalFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.std.StreamSelectRuntimeFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IJobletEventListenerFactory;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+
+@SuppressWarnings("rawtypes")
+// TODO: We should eventually have a hierarchy of classes that can create all
+// possible index job specs,
+// not just for creation.
+public abstract class SecondaryIndexOperationsHelper {
+    protected final PhysicalOptimizationConfig physOptConf;
+
+    protected int numPrimaryKeys;
+    protected int numSecondaryKeys;
+    protected AqlMetadataProvider metadataProvider;
+    protected String dataverseName;
+    protected String datasetName;
+    protected Dataset dataset;
+    protected ARecordType itemType;
+    protected ISerializerDeserializer payloadSerde;
+    protected IFileSplitProvider primaryFileSplitProvider;
+    protected AlgebricksPartitionConstraint primaryPartitionConstraint;
+    protected IFileSplitProvider secondaryFileSplitProvider;
+    protected AlgebricksPartitionConstraint secondaryPartitionConstraint;
+    protected String secondaryIndexName;
+    protected boolean anySecondaryKeyIsNullable = false;
+    protected boolean isEnforcingKeyTypes = false;
+
+    protected long numElementsHint;
+    protected IBinaryComparatorFactory[] primaryComparatorFactories;
+    protected int[] primaryBloomFilterKeyFields;
+    protected RecordDescriptor primaryRecDesc;
+    protected IBinaryComparatorFactory[] secondaryComparatorFactories;
+    protected ITypeTraits[] secondaryTypeTraits;
+    protected int[] secondaryBloomFilterKeyFields;
+    protected RecordDescriptor secondaryRecDesc;
+    protected ICopyEvaluatorFactory[] secondaryFieldAccessEvalFactories;
+
+    protected IAsterixPropertiesProvider propertiesProvider;
+    protected ILSMMergePolicyFactory mergePolicyFactory;
+    protected Map<String, String> mergePolicyFactoryProperties;
+    protected RecordDescriptor enforcedRecDesc;
+    protected ARecordType enforcedItemType;
+
+    protected int numFilterFields;
+    protected List<String> filterFieldName;
+    protected ITypeTraits[] filterTypeTraits;
+    protected IBinaryComparatorFactory[] filterCmpFactories;
+    protected int[] secondaryFilterFields;
+    protected int[] primaryFilterFields;
+    protected int[] primaryBTreeFields;
+    protected int[] secondaryBTreeFields;
+    protected List<ExternalFile> externalFiles;
+
+    // Prevent public construction. Should be created via createIndexCreator().
+    protected SecondaryIndexOperationsHelper(PhysicalOptimizationConfig physOptConf,
+            IAsterixPropertiesProvider propertiesProvider) {
+        this.physOptConf = physOptConf;
+        this.propertiesProvider = propertiesProvider;
+    }
+
+    public static SecondaryIndexOperationsHelper createIndexOperationsHelper(IndexType indexType, String dataverseName,
+            String datasetName, String indexName, List<List<String>> secondaryKeyFields,
+            List<IAType> secondaryKeyTypes, boolean isEnforced, int gramLength, AqlMetadataProvider metadataProvider,
+            PhysicalOptimizationConfig physOptConf, ARecordType recType, ARecordType enforcedType)
+            throws AsterixException, AlgebricksException {
+        IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance();
+        SecondaryIndexOperationsHelper indexOperationsHelper = null;
+        switch (indexType) {
+            case BTREE: {
+                indexOperationsHelper = new SecondaryBTreeOperationsHelper(physOptConf, asterixPropertiesProvider);
+                break;
+            }
+            case RTREE: {
+                indexOperationsHelper = new SecondaryRTreeOperationsHelper(physOptConf, asterixPropertiesProvider);
+                break;
+            }
+            case SINGLE_PARTITION_WORD_INVIX:
+            case SINGLE_PARTITION_NGRAM_INVIX:
+            case LENGTH_PARTITIONED_WORD_INVIX:
+            case LENGTH_PARTITIONED_NGRAM_INVIX: {
+                indexOperationsHelper = new SecondaryInvertedIndexOperationsHelper(physOptConf,
+                        asterixPropertiesProvider);
+                break;
+            }
+            default: {
+                throw new AsterixException("Unknown Index Type: " + indexType);
+            }
+        }
+        indexOperationsHelper.init(indexType, dataverseName, datasetName, indexName, secondaryKeyFields,
+                secondaryKeyTypes, isEnforced, gramLength, metadataProvider, recType, enforcedType);
+        return indexOperationsHelper;
+    }
+
+    public abstract JobSpecification buildCreationJobSpec() throws AsterixException, AlgebricksException;
+
+    public abstract JobSpecification buildLoadingJobSpec() throws AsterixException, AlgebricksException;
+
+    public abstract JobSpecification buildCompactJobSpec() throws AsterixException, AlgebricksException;
+
+    protected void init(IndexType indexType, String dvn, String dsn, String in, List<List<String>> secondaryKeyFields,
+            List<IAType> secondaryKeyTypes, boolean isEnforced, int gramLength, AqlMetadataProvider metadataProvider,
+            ARecordType aRecType, ARecordType enforcedType) throws AsterixException, AlgebricksException {
+        this.metadataProvider = metadataProvider;
+        dataverseName = dvn == null ? metadataProvider.getDefaultDataverseName() : dvn;
+        datasetName = dsn;
+        secondaryIndexName = in;
+        isEnforcingKeyTypes = isEnforced;
+        dataset = metadataProvider.findDataset(dataverseName, datasetName);
+        if (dataset == null) {
+            throw new AsterixException("Unknown dataset " + datasetName);
+        }
+        boolean temp = dataset.getDatasetDetails().isTemp();
+        itemType = aRecType;
+        enforcedItemType = enforcedType;
+        payloadSerde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
+        numSecondaryKeys = secondaryKeyFields.size();
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
+                .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, secondaryIndexName, temp);
+        secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
+        secondaryPartitionConstraint = secondarySplitsAndConstraint.second;
+
+        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+            numPrimaryKeys = ExternalIndexingOperations.getRIDSize(dataset);
+        } else {
+            filterFieldName = DatasetUtils.getFilterField(dataset);
+            if (filterFieldName != null) {
+                numFilterFields = 1;
+            } else {
+                numFilterFields = 0;
+            }
+
+            numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider
+                    .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, datasetName, temp);
+            primaryFileSplitProvider = primarySplitsAndConstraint.first;
+            primaryPartitionConstraint = primarySplitsAndConstraint.second;
+            setPrimaryRecDescAndComparators();
+        }
+        setSecondaryRecDescAndComparators(indexType, secondaryKeyFields, secondaryKeyTypes, gramLength,
+                metadataProvider);
+        numElementsHint = metadataProvider.getCardinalityPerPartitionHint(dataset);
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
+                metadataProvider.getMetadataTxnContext());
+        mergePolicyFactory = compactionInfo.first;
+        mergePolicyFactoryProperties = compactionInfo.second;
+
+        if (numFilterFields > 0) {
+            setFilterTypeTraitsAndComparators();
+        }
+    }
+
+    protected void setFilterTypeTraitsAndComparators() throws AlgebricksException {
+        filterTypeTraits = new ITypeTraits[numFilterFields];
+        filterCmpFactories = new IBinaryComparatorFactory[numFilterFields];
+        secondaryFilterFields = new int[numFilterFields];
+        primaryFilterFields = new int[numFilterFields];
+        primaryBTreeFields = new int[numPrimaryKeys + 1];
+        secondaryBTreeFields = new int[numSecondaryKeys + numPrimaryKeys];
+        for (int i = 0; i < primaryBTreeFields.length; i++) {
+            primaryBTreeFields[i] = i;
+        }
+        for (int i = 0; i < secondaryBTreeFields.length; i++) {
+            secondaryBTreeFields[i] = i;
+        }
+
+        IAType type;
+        try {
+            type = itemType.getSubFieldType(filterFieldName);
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+        filterCmpFactories[0] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(type, true);
+        filterTypeTraits[0] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(type);
+        secondaryFilterFields[0] = getNumSecondaryKeys() + numPrimaryKeys;
+        primaryFilterFields[0] = numPrimaryKeys + 1;
+    }
+
+    protected abstract int getNumSecondaryKeys();
+
+    protected void setPrimaryRecDescAndComparators() throws AlgebricksException {
+        List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
+        int numPrimaryKeys = partitioningKeys.size();
+        ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1];
+        ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1];
+        primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
+        primaryBloomFilterKeyFields = new int[numPrimaryKeys];
+        ISerializerDeserializerProvider serdeProvider = metadataProvider.getFormat().getSerdeProvider();
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            IAType keyType;
+            try {
+                keyType = itemType.getSubFieldType(partitioningKeys.get(i));
+            } catch (IOException e) {
+                throw new AlgebricksException(e);
+            }
+            primaryRecFields[i] = serdeProvider.getSerializerDeserializer(keyType);
+            primaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+                    keyType, true);
+            primaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+            primaryBloomFilterKeyFields[i] = i;
+        }
+        primaryRecFields[numPrimaryKeys] = payloadSerde;
+        primaryTypeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType);
+        primaryRecDesc = new RecordDescriptor(primaryRecFields, primaryTypeTraits);
+    }
+
+    protected abstract void setSecondaryRecDescAndComparators(IndexType indexType,
+            List<List<String>> secondaryKeyFields, List<IAType> secondaryKeyTypes, int gramLength,
+            AqlMetadataProvider metadataProvider) throws AlgebricksException, AsterixException;
+
+    protected AbstractOperatorDescriptor createDummyKeyProviderOp(JobSpecification spec) throws AsterixException,
+            AlgebricksException {
+        // Build dummy tuple containing one field with a dummy value inside.
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
+        DataOutput dos = tb.getDataOutput();
+        tb.reset();
+        try {
+            // Serialize dummy value into a field.
+            IntegerSerializerDeserializer.INSTANCE.serialize(0, dos);
+        } catch (HyracksDataException e) {
+            throw new AsterixException(e);
+        }
+        // Add dummy field.
+        tb.addFieldEndOffset();
+        ISerializerDeserializer[] keyRecDescSers = { IntegerSerializerDeserializer.INSTANCE };
+        RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+        ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
+                keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, keyProviderOp,
+                primaryPartitionConstraint);
+        return keyProviderOp;
+    }
+
+    protected BTreeSearchOperatorDescriptor createPrimaryIndexScanOp(JobSpecification spec) throws AlgebricksException {
+        // -Infinity
+        int[] lowKeyFields = null;
+        // +Infinity
+        int[] highKeyFields = null;
+        ITransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
+        JobId jobId = JobIdFactory.generateJobId();
+        metadataProvider.setJobId(jobId);
+        boolean isWriteTransaction = metadataProvider.isWriteTransaction();
+        IJobletEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(jobId, isWriteTransaction);
+        spec.setJobletEventListenerFactory(jobEventListenerFactory);
+
+        boolean temp = dataset.getDatasetDetails().isTemp();
+        ISearchOperationCallbackFactory searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
+                : new PrimaryIndexInstantSearchOperationCallbackFactory(jobId, dataset.getDatasetId(),
+                        primaryBloomFilterKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE);
+        AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
+        BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                primaryFileSplitProvider, primaryRecDesc.getTypeTraits(), primaryComparatorFactories,
+                primaryBloomFilterKeyFields, lowKeyFields, highKeyFields, true, true,
+                new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+                        mergePolicyFactory, mergePolicyFactoryProperties, new PrimaryIndexOperationTrackerProvider(
+                                dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                        LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties
+                                .getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories,
+                        primaryBTreeFields, primaryFilterFields, !temp), false, false, null,
+                searchCallbackFactory, null, null);
+
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp,
+                primaryPartitionConstraint);
+        return primarySearchOp;
+    }
+
+    protected AlgebricksMetaOperatorDescriptor createAssignOp(JobSpecification spec,
+            AbstractOperatorDescriptor primaryScanOp, int numSecondaryKeyFields) throws AlgebricksException {
+        int[] outColumns = new int[numSecondaryKeyFields + numFilterFields];
+        int[] projectionList = new int[numSecondaryKeyFields + numPrimaryKeys + numFilterFields];
+        for (int i = 0; i < numSecondaryKeyFields + numFilterFields; i++) {
+            outColumns[i] = numPrimaryKeys + i;
+        }
+        int projCount = 0;
+        for (int i = 0; i < numSecondaryKeyFields; i++) {
+            projectionList[projCount++] = numPrimaryKeys + i;
+        }
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            projectionList[projCount++] = i;
+        }
+        if (numFilterFields > 0) {
+            projectionList[projCount++] = numPrimaryKeys + numSecondaryKeyFields;
+        }
+
+        IScalarEvaluatorFactory[] sefs = new IScalarEvaluatorFactory[secondaryFieldAccessEvalFactories.length];
+        for (int i = 0; i < secondaryFieldAccessEvalFactories.length; ++i) {
+            sefs[i] = new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.ScalarEvaluatorFactoryAdapter(
+                    secondaryFieldAccessEvalFactories[i]);
+        }
+        AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, sefs, projectionList);
+        AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
+                new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
+                primaryPartitionConstraint);
+        return asterixAssignOp;
+    }
+
+    protected AlgebricksMetaOperatorDescriptor createCastOp(JobSpecification spec,
+            AbstractOperatorDescriptor primaryScanOp, int numSecondaryKeyFields, DatasetType dsType) {
+        CastRecordDescriptor castFuncDesc = (CastRecordDescriptor) CastRecordDescriptor.FACTORY
+                .createFunctionDescriptor();
+        castFuncDesc.reset(enforcedItemType, itemType);
+
+        int[] outColumns = new int[1];
+        int[] projectionList = new int[1 + numPrimaryKeys];
+        int recordIdx;
+        //external datascan operator returns a record as the first field, instead of the last in internal case
+        if (dsType == DatasetType.EXTERNAL) {
+            recordIdx = 0;
+            outColumns[0] = 0;
+        } else {
+            recordIdx = numPrimaryKeys;
+            outColumns[0] = numPrimaryKeys;
+        }
+        for (int i = 0; i <= numPrimaryKeys; i++) {
+            projectionList[i] = i;
+        }
+        ICopyEvaluatorFactory[] castEvalFact = new ICopyEvaluatorFactory[] { new ColumnAccessEvalFactory(recordIdx) };
+        IScalarEvaluatorFactory[] sefs = new IScalarEvaluatorFactory[1];
+        sefs[0] = new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.ScalarEvaluatorFactoryAdapter(
+                castFuncDesc.createEvaluatorFactory(castEvalFact));
+        AssignRuntimeFactory castAssign = new AssignRuntimeFactory(outColumns, sefs, projectionList);
+        AlgebricksMetaOperatorDescriptor castRecAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
+                new IPushRuntimeFactory[] { castAssign }, new RecordDescriptor[] { enforcedRecDesc });
+
+        return castRecAssignOp;
+    }
+
+    protected ExternalSortOperatorDescriptor createSortOp(JobSpecification spec,
+            IBinaryComparatorFactory[] secondaryComparatorFactories, RecordDescriptor secondaryRecDesc) {
+        int[] sortFields = new int[secondaryComparatorFactories.length];
+        for (int i = 0; i < secondaryComparatorFactories.length; i++) {
+            sortFields[i] = i;
+        }
+        ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(spec,
+                physOptConf.getMaxFramesExternalSort(), sortFields, secondaryComparatorFactories, secondaryRecDesc);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp, primaryPartitionConstraint);
+        return sortOp;
+    }
+
+    protected TreeIndexBulkLoadOperatorDescriptor createTreeIndexBulkLoadOp(JobSpecification spec,
+            int numSecondaryKeyFields, IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor)
+            throws MetadataException, AlgebricksException {
+        int[] fieldPermutation = new int[numSecondaryKeyFields + numPrimaryKeys + numFilterFields];
+        for (int i = 0; i < fieldPermutation.length; i++) {
+            fieldPermutation[i] = i;
+        }
+        TreeIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(spec,
+                secondaryRecDesc, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, secondaryFileSplitProvider,
+                secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories, secondaryBloomFilterKeyFields,
+                fieldPermutation, fillFactor, false, numElementsHint, false, dataflowHelperFactory);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp,
+                secondaryPartitionConstraint);
+        return treeIndexBulkLoadOp;
+    }
+
+    public AlgebricksMetaOperatorDescriptor createFilterNullsSelectOp(JobSpecification spec, int numSecondaryKeyFields)
+            throws AlgebricksException {
+        ICopyEvaluatorFactory[] andArgsEvalFactories = new ICopyEvaluatorFactory[numSecondaryKeyFields];
+        NotDescriptor notDesc = new NotDescriptor();
+        IsNullDescriptor isNullDesc = new IsNullDescriptor();
+        for (int i = 0; i < numSecondaryKeyFields; i++) {
+            // Access column i, and apply 'is not null'.
+            ColumnAccessEvalFactory columnAccessEvalFactory = new ColumnAccessEvalFactory(i);
+            ICopyEvaluatorFactory isNullEvalFactory = isNullDesc
+                    .createEvaluatorFactory(new ICopyEvaluatorFactory[] { columnAccessEvalFactory });
+            ICopyEvaluatorFactory notEvalFactory = notDesc
+                    .createEvaluatorFactory(new ICopyEvaluatorFactory[] { isNullEvalFactory });
+            andArgsEvalFactories[i] = notEvalFactory;
+        }
+        ICopyEvaluatorFactory selectCond = null;
+        if (numSecondaryKeyFields > 1) {
+            // Create conjunctive condition where all secondary index keys must
+            // satisfy 'is not null'.
+            AndDescriptor andDesc = new AndDescriptor();
+            selectCond = andDesc.createEvaluatorFactory(andArgsEvalFactories);
+        } else {
+            selectCond = andArgsEvalFactories[0];
+        }
+        StreamSelectRuntimeFactory select = new StreamSelectRuntimeFactory(
+                new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.ScalarEvaluatorFactoryAdapter(selectCond),
+                null, AqlBinaryBooleanInspectorImpl.FACTORY, false, -1, null);
+        AlgebricksMetaOperatorDescriptor asterixSelectOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
+                new IPushRuntimeFactory[] { select }, new RecordDescriptor[] { secondaryRecDesc });
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixSelectOp,
+                primaryPartitionConstraint);
+        return asterixSelectOp;
+    }
+
+    // This method creates a source indexing operator for external data
+    protected ExternalDataScanOperatorDescriptor createExternalIndexingOp(JobSpecification spec)
+            throws AlgebricksException, AsterixException {
+        // A record + primary keys
+        ISerializerDeserializer[] serdes = new ISerializerDeserializer[1 + numPrimaryKeys];
+        ITypeTraits[] typeTraits = new ITypeTraits[1 + numPrimaryKeys];
+        // payload serde and type traits for the record slot
+        serdes[0] = payloadSerde;
+        typeTraits[0] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType);
+        //  serdes and type traits for rid fields
+        for (int i = 1; i < serdes.length; i++) {
+            serdes[i] = IndexingConstants.getSerializerDeserializer(i - 1);
+            typeTraits[i] = IndexingConstants.getTypeTraits(i - 1);
+        }
+        // output record desc
+        RecordDescriptor indexerDesc = new RecordDescriptor(serdes, typeTraits);
+
+        // Create the operator and its partition constraits
+        Pair<ExternalDataScanOperatorDescriptor, AlgebricksPartitionConstraint> indexingOpAndConstraints;
+        try {
+            indexingOpAndConstraints = ExternalIndexingOperations.createExternalIndexingOp(spec, metadataProvider,
+                    dataset, itemType, indexerDesc, externalFiles);
+        } catch (Exception e) {
+            throw new AlgebricksException(e);
+        }
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexingOpAndConstraints.first,
+                indexingOpAndConstraints.second);
+
+        // Set the primary partition constraints to this partition constraints
+        primaryPartitionConstraint = indexingOpAndConstraints.second;
+        return indexingOpAndConstraints.first;
+    }
+
+    protected AlgebricksMetaOperatorDescriptor createExternalAssignOp(JobSpecification spec, int numSecondaryKeys)
+            throws AlgebricksException {
+        int[] outColumns = new int[numSecondaryKeys];
+        int[] projectionList = new int[numSecondaryKeys + numPrimaryKeys];
+        for (int i = 0; i < numSecondaryKeys; i++) {
+            outColumns[i] = i + numPrimaryKeys + 1;
+            projectionList[i] = i + numPrimaryKeys + 1;
+        }
+
+        IScalarEvaluatorFactory[] sefs = new IScalarEvaluatorFactory[secondaryFieldAccessEvalFactories.length];
+        for (int i = 0; i < secondaryFieldAccessEvalFactories.length; ++i) {
+            sefs[i] = new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.ScalarEvaluatorFactoryAdapter(
+                    secondaryFieldAccessEvalFactories[i]);
+        }
+        //add External RIDs to the projection list
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            projectionList[numSecondaryKeys + i] = i + 1;
+        }
+
+        AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, sefs, projectionList);
+        AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
+                new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
+        return asterixAssignOp;
+    }
+
+    protected ExternalIndexBulkModifyOperatorDescriptor createExternalIndexBulkModifyOp(JobSpecification spec,
+            int numSecondaryKeyFields, IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor)
+            throws MetadataException, AlgebricksException {
+        int[] fieldPermutation = new int[numSecondaryKeyFields + numPrimaryKeys];
+        for (int i = 0; i < numSecondaryKeyFields + numPrimaryKeys; i++) {
+            fieldPermutation[i] = i;
+        }
+        // create a list of file ids
+        int numOfDeletedFiles = 0;
+        for (ExternalFile file : externalFiles) {
+            if (file.getPendingOp() == ExternalFilePendingOp.PENDING_DROP_OP)
+                numOfDeletedFiles++;
+        }
+        int[] deletedFiles = new int[numOfDeletedFiles];
+        int i = 0;
+        for (ExternalFile file : externalFiles) {
+            if (file.getPendingOp() == ExternalFilePendingOp.PENDING_DROP_OP) {
+                deletedFiles[i] = file.getFileNumber();
+            }
+        }
+        ExternalIndexBulkModifyOperatorDescriptor treeIndexBulkLoadOp = new ExternalIndexBulkModifyOperatorDescriptor(
+                spec, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, secondaryFileSplitProvider, secondaryTypeTraits,
+                secondaryComparatorFactories, secondaryBloomFilterKeyFields, dataflowHelperFactory,
+                NoOpOperationCallbackFactory.INSTANCE, deletedFiles, fieldPermutation, fillFactor, numElementsHint);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp,
+                secondaryPartitionConstraint);
+        return treeIndexBulkLoadOp;
+    }
+
+    public List<ExternalFile> getExternalFiles() {
+        return externalFiles;
+    }
+
+    public void setExternalFiles(List<ExternalFile> externalFiles) {
+        this.externalFiles = externalFiles;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/org/apache/asterix/file/SecondaryInvertedIndexOperationsHelper.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/file/SecondaryInvertedIndexOperationsHelper.java b/asterix-app/src/main/java/org/apache/asterix/file/SecondaryInvertedIndexOperationsHelper.java
new file mode 100644
index 0000000..74c4256
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/file/SecondaryInvertedIndexOperationsHelper.java
@@ -0,0 +1,373 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.file;
+
+import java.util.List;
+
+import edu.uci.ics.asterix.common.api.ILocalResourceMetadata;
+import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
+import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
+import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
+import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.entities.Index;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
+import edu.uci.ics.asterix.runtime.formats.FormatUtils;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
+import edu.uci.ics.asterix.transaction.management.resource.LSMInvertedIndexLocalResourceMetadata;
+import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
+import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import edu.uci.ics.hyracks.algebricks.data.ISerializerDeserializerProvider;
+import edu.uci.ics.hyracks.algebricks.data.ITypeTraitProvider;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.ShortPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexBulkLoadOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexCompactOperator;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexCreateOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.PartitionedLSMInvertedIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
+import edu.uci.ics.hyracks.storage.common.file.ILocalResourceFactoryProvider;
+import edu.uci.ics.hyracks.storage.common.file.LocalResource;
+
+public class SecondaryInvertedIndexOperationsHelper extends SecondaryIndexOperationsHelper {
+
+    private IAType secondaryKeyType;
+    private ITypeTraits[] invListsTypeTraits;
+    private IBinaryComparatorFactory[] tokenComparatorFactories;
+    private ITypeTraits[] tokenTypeTraits;
+    private IBinaryTokenizerFactory tokenizerFactory;
+    // For tokenization, sorting and loading. Represents <token, primary keys>.
+    private int numTokenKeyPairFields;
+    private IBinaryComparatorFactory[] tokenKeyPairComparatorFactories;
+    private RecordDescriptor tokenKeyPairRecDesc;
+    private boolean isPartitioned;
+    private int[] invertedIndexFields;
+    private int[] invertedIndexFieldsForNonBulkLoadOps;
+    private int[] secondaryFilterFieldsForNonBulkLoadOps;
+
+    protected SecondaryInvertedIndexOperationsHelper(PhysicalOptimizationConfig physOptConf,
+            IAsterixPropertiesProvider propertiesProvider) {
+        super(physOptConf, propertiesProvider);
+    }
+
+    @Override
+    @SuppressWarnings("rawtypes")
+    protected void setSecondaryRecDescAndComparators(IndexType indexType, List<List<String>> secondaryKeyFields,
+            List<IAType> secondaryKeyTypes, int gramLength, AqlMetadataProvider metadata) throws AlgebricksException,
+            AsterixException {
+        // Sanity checks.
+        if (numPrimaryKeys > 1) {
+            throw new AsterixException("Cannot create inverted index on dataset with composite primary key.");
+        }
+        if (numSecondaryKeys > 1) {
+            throw new AsterixException("Cannot create composite inverted index on multiple fields.");
+        }
+        if (indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX
+                || indexType == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) {
+            isPartitioned = true;
+        } else {
+            isPartitioned = false;
+        }
+        // Prepare record descriptor used in the assign op, and the optional
+        // select op.
+        secondaryFieldAccessEvalFactories = new ICopyEvaluatorFactory[numSecondaryKeys + numFilterFields];
+        ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys
+                + numFilterFields];
+        ISerializerDeserializer[] enforcedRecFields = new ISerializerDeserializer[1 + numPrimaryKeys + numFilterFields];
+        secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys];
+        ITypeTraits[] enforcedTypeTraits = new ITypeTraits[1 + numPrimaryKeys];
+        ISerializerDeserializerProvider serdeProvider = FormatUtils.getDefaultFormat().getSerdeProvider();
+        ITypeTraitProvider typeTraitProvider = FormatUtils.getDefaultFormat().getTypeTraitProvider();
+        if (numSecondaryKeys > 0) {
+            secondaryFieldAccessEvalFactories[0] = FormatUtils.getDefaultFormat().getFieldAccessEvaluatorFactory(
+                    isEnforcingKeyTypes ? enforcedItemType : itemType, secondaryKeyFields.get(0), numPrimaryKeys);
+            Pair<IAType, Boolean> keyTypePair = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0),
+                    secondaryKeyFields.get(0), itemType);
+            secondaryKeyType = keyTypePair.first;
+            anySecondaryKeyIsNullable = anySecondaryKeyIsNullable || keyTypePair.second;
+            ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(secondaryKeyType);
+            secondaryRecFields[0] = keySerde;
+            secondaryTypeTraits[0] = typeTraitProvider.getTypeTrait(secondaryKeyType);
+        }
+        if (numFilterFields > 0) {
+            secondaryFieldAccessEvalFactories[numSecondaryKeys] = FormatUtils.getDefaultFormat()
+                    .getFieldAccessEvaluatorFactory(itemType, filterFieldName, numPrimaryKeys);
+            Pair<IAType, Boolean> keyTypePair = Index.getNonNullableKeyFieldType(filterFieldName, itemType);
+            IAType type = keyTypePair.first;
+            ISerializerDeserializer serde = serdeProvider.getSerializerDeserializer(type);
+            secondaryRecFields[numPrimaryKeys + numSecondaryKeys] = serde;
+        }
+        secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
+        // Comparators and type traits for tokens.
+        int numTokenFields = (!isPartitioned) ? numSecondaryKeys : numSecondaryKeys + 1;
+        tokenComparatorFactories = new IBinaryComparatorFactory[numTokenFields];
+        tokenTypeTraits = new ITypeTraits[numTokenFields];
+        tokenComparatorFactories[0] = NonTaggedFormatUtil.getTokenBinaryComparatorFactory(secondaryKeyType);
+        tokenTypeTraits[0] = NonTaggedFormatUtil.getTokenTypeTrait(secondaryKeyType);
+        if (isPartitioned) {
+            // The partitioning field is hardcoded to be a short *without* an Asterix type tag.
+            tokenComparatorFactories[1] = PointableBinaryComparatorFactory.of(ShortPointable.FACTORY);
+            tokenTypeTraits[1] = ShortPointable.TYPE_TRAITS;
+        }
+        // Set tokenizer factory.
+        // TODO: We might want to expose the hashing option at the AQL level,
+        // and add the choice to the index metadata.
+        tokenizerFactory = NonTaggedFormatUtil.getBinaryTokenizerFactory(secondaryKeyType.getTypeTag(), indexType,
+                gramLength);
+        // Type traits for inverted-list elements. Inverted lists contain
+        // primary keys.
+        invListsTypeTraits = new ITypeTraits[numPrimaryKeys];
+        if (numPrimaryKeys > 0) {
+            invListsTypeTraits[0] = primaryRecDesc.getTypeTraits()[0];
+            enforcedRecFields[0] = primaryRecDesc.getFields()[0];
+            enforcedTypeTraits[0] = primaryRecDesc.getTypeTraits()[0];
+        }
+        enforcedRecFields[numPrimaryKeys] = serdeProvider.getSerializerDeserializer(itemType);
+        enforcedRecDesc = new RecordDescriptor(enforcedRecFields, enforcedTypeTraits);
+        // For tokenization, sorting and loading.
+        // One token (+ optional partitioning field) + primary keys.
+        numTokenKeyPairFields = (!isPartitioned) ? 1 + numPrimaryKeys : 2 + numPrimaryKeys;
+        ISerializerDeserializer[] tokenKeyPairFields = new ISerializerDeserializer[numTokenKeyPairFields
+                + numFilterFields];
+        ITypeTraits[] tokenKeyPairTypeTraits = new ITypeTraits[numTokenKeyPairFields];
+        tokenKeyPairComparatorFactories = new IBinaryComparatorFactory[numTokenKeyPairFields];
+        tokenKeyPairFields[0] = serdeProvider.getSerializerDeserializer(secondaryKeyType);
+        tokenKeyPairTypeTraits[0] = tokenTypeTraits[0];
+        tokenKeyPairComparatorFactories[0] = NonTaggedFormatUtil.getTokenBinaryComparatorFactory(secondaryKeyType);
+        int pkOff = 1;
+        if (isPartitioned) {
+            tokenKeyPairFields[1] = ShortSerializerDeserializer.INSTANCE;
+            tokenKeyPairTypeTraits[1] = tokenTypeTraits[1];
+            tokenKeyPairComparatorFactories[1] = PointableBinaryComparatorFactory.of(ShortPointable.FACTORY);
+            pkOff = 2;
+        }
+        if (numPrimaryKeys > 0) {
+            tokenKeyPairFields[pkOff] = primaryRecDesc.getFields()[0];
+            tokenKeyPairTypeTraits[pkOff] = primaryRecDesc.getTypeTraits()[0];
+            tokenKeyPairComparatorFactories[pkOff] = primaryComparatorFactories[0];
+        }
+        if (numFilterFields > 0) {
+            tokenKeyPairFields[numPrimaryKeys + pkOff] = secondaryRecFields[numPrimaryKeys + numSecondaryKeys];
+        }
+        tokenKeyPairRecDesc = new RecordDescriptor(tokenKeyPairFields, tokenKeyPairTypeTraits);
+        if (filterFieldName != null) {
+            invertedIndexFields = new int[numTokenKeyPairFields];
+            for (int i = 0; i < invertedIndexFields.length; i++) {
+                invertedIndexFields[i] = i;
+            }
+            secondaryFilterFieldsForNonBulkLoadOps = new int[numFilterFields];
+            secondaryFilterFieldsForNonBulkLoadOps[0] = numSecondaryKeys + numPrimaryKeys;
+            invertedIndexFieldsForNonBulkLoadOps = new int[numSecondaryKeys + numPrimaryKeys];
+            for (int i = 0; i < invertedIndexFieldsForNonBulkLoadOps.length; i++) {
+                invertedIndexFieldsForNonBulkLoadOps[i] = i;
+            }
+        }
+
+    }
+
+    @Override
+    protected int getNumSecondaryKeys() {
+        return numTokenKeyPairFields - numPrimaryKeys;
+    }
+
+    @Override
+    public JobSpecification buildCreationJobSpec() throws AsterixException, AlgebricksException {
+        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+
+        //prepare a LocalResourceMetadata which will be stored in NC's local resource repository
+        ILocalResourceMetadata localResourceMetadata = new LSMInvertedIndexLocalResourceMetadata(invListsTypeTraits,
+                primaryComparatorFactories, tokenTypeTraits, tokenComparatorFactories, tokenizerFactory, isPartitioned,
+                dataset.getDatasetId(), mergePolicyFactory, mergePolicyFactoryProperties, filterTypeTraits,
+                filterCmpFactories, invertedIndexFields, secondaryFilterFields, secondaryFilterFieldsForNonBulkLoadOps,
+                invertedIndexFieldsForNonBulkLoadOps);
+        ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
+                localResourceMetadata, LocalResource.LSMInvertedIndexResource);
+
+        IIndexDataflowHelperFactory dataflowHelperFactory = createDataflowHelperFactory();
+        LSMInvertedIndexCreateOperatorDescriptor invIndexCreateOp = new LSMInvertedIndexCreateOperatorDescriptor(spec,
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, secondaryFileSplitProvider,
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, tokenTypeTraits, tokenComparatorFactories,
+                invListsTypeTraits, primaryComparatorFactories, tokenizerFactory, dataflowHelperFactory,
+                localResourceFactoryProvider, NoOpOperationCallbackFactory.INSTANCE);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, invIndexCreateOp,
+                secondaryPartitionConstraint);
+        spec.addRoot(invIndexCreateOp);
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    @Override
+    public JobSpecification buildLoadingJobSpec() throws AsterixException, AlgebricksException {
+        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+
+        // Create dummy key provider for feeding the primary index scan.
+        AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec);
+
+        // Create primary index scan op.
+        BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(spec);
+
+        AbstractOperatorDescriptor sourceOp = primaryScanOp;
+        if (isEnforcingKeyTypes) {
+            sourceOp = createCastOp(spec, primaryScanOp, numSecondaryKeys, dataset.getDatasetType());
+            spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, sourceOp, 0);
+        }
+        AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec, sourceOp, numSecondaryKeys);
+
+        // If any of the secondary fields are nullable, then add a select op
+        // that filters nulls.
+        AlgebricksMetaOperatorDescriptor selectOp = null;
+        if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) {
+            selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys);
+        }
+
+        // Create a tokenizer op.
+        AbstractOperatorDescriptor tokenizerOp = createTokenizerOp(spec);
+
+        // Sort by token + primary keys.
+        ExternalSortOperatorDescriptor sortOp = createSortOp(spec, tokenKeyPairComparatorFactories, tokenKeyPairRecDesc);
+
+        // Create secondary inverted index bulk load op.
+        LSMInvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = createInvertedIndexBulkLoadOp(spec);
+
+        AlgebricksMetaOperatorDescriptor metaOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0,
+                new IPushRuntimeFactory[] { new SinkRuntimeFactory() }, new RecordDescriptor[] {});
+        // Connect the operators.
+        spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, asterixAssignOp, 0);
+        if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) {
+            spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0);
+            spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 0, tokenizerOp, 0);
+        } else {
+            spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, tokenizerOp, 0);
+        }
+        spec.connect(new OneToOneConnectorDescriptor(spec), tokenizerOp, 0, sortOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, invIndexBulkLoadOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), invIndexBulkLoadOp, 0, metaOp, 0);
+        spec.addRoot(metaOp);
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    private AbstractOperatorDescriptor createTokenizerOp(JobSpecification spec) throws AlgebricksException {
+        int docField = 0;
+        int[] primaryKeyFields = new int[numPrimaryKeys + numFilterFields];
+        for (int i = 0; i < primaryKeyFields.length; i++) {
+            primaryKeyFields[i] = numSecondaryKeys + i;
+        }
+        BinaryTokenizerOperatorDescriptor tokenizerOp = new BinaryTokenizerOperatorDescriptor(spec,
+                tokenKeyPairRecDesc, tokenizerFactory, docField, primaryKeyFields, isPartitioned, false);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, tokenizerOp,
+                primaryPartitionConstraint);
+        return tokenizerOp;
+    }
+
+    @Override
+    protected ExternalSortOperatorDescriptor createSortOp(JobSpecification spec,
+            IBinaryComparatorFactory[] secondaryComparatorFactories, RecordDescriptor secondaryRecDesc) {
+        // Sort on token and primary keys.
+        int[] sortFields = new int[numTokenKeyPairFields];
+        for (int i = 0; i < numTokenKeyPairFields; i++) {
+            sortFields[i] = i;
+        }
+        ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(spec,
+                physOptConf.getMaxFramesExternalSort(), sortFields, tokenKeyPairComparatorFactories, secondaryRecDesc);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp, primaryPartitionConstraint);
+        return sortOp;
+    }
+
+    private LSMInvertedIndexBulkLoadOperatorDescriptor createInvertedIndexBulkLoadOp(JobSpecification spec) {
+        int[] fieldPermutation = new int[numTokenKeyPairFields + numFilterFields];
+        for (int i = 0; i < fieldPermutation.length; i++) {
+            fieldPermutation[i] = i;
+        }
+        IIndexDataflowHelperFactory dataflowHelperFactory = createDataflowHelperFactory();
+        LSMInvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = new LSMInvertedIndexBulkLoadOperatorDescriptor(
+                spec, secondaryRecDesc, fieldPermutation, false, numElementsHint, false,
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, secondaryFileSplitProvider,
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, tokenTypeTraits, tokenComparatorFactories,
+                invListsTypeTraits, primaryComparatorFactories, tokenizerFactory, dataflowHelperFactory);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, invIndexBulkLoadOp,
+                secondaryPartitionConstraint);
+        return invIndexBulkLoadOp;
+    }
+
+    private IIndexDataflowHelperFactory createDataflowHelperFactory() {
+        AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
+        boolean temp = dataset.getDatasetDetails().isTemp();
+        if (!isPartitioned) {
+            return new LSMInvertedIndexDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
+                    dataset.getDatasetId()), mergePolicyFactory, mergePolicyFactoryProperties,
+                    new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                    AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                    LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
+                    storageProperties.getBloomFilterFalsePositiveRate(), invertedIndexFields, filterTypeTraits,
+                    filterCmpFactories, secondaryFilterFields, secondaryFilterFieldsForNonBulkLoadOps,
+                    invertedIndexFieldsForNonBulkLoadOps, !temp);
+        } else {
+            return new PartitionedLSMInvertedIndexDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
+                    dataset.getDatasetId()), mergePolicyFactory, mergePolicyFactoryProperties,
+                    new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                    AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                    LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
+                    storageProperties.getBloomFilterFalsePositiveRate(), invertedIndexFields, filterTypeTraits,
+                    filterCmpFactories, secondaryFilterFields, secondaryFilterFieldsForNonBulkLoadOps,
+                    invertedIndexFieldsForNonBulkLoadOps, !temp);
+        }
+    }
+
+    @Override
+    public JobSpecification buildCompactJobSpec() throws AsterixException, AlgebricksException {
+        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+
+        IIndexDataflowHelperFactory dataflowHelperFactory = createDataflowHelperFactory();
+        LSMInvertedIndexCompactOperator compactOp = new LSMInvertedIndexCompactOperator(spec,
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, secondaryFileSplitProvider,
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, tokenTypeTraits, tokenComparatorFactories,
+                invListsTypeTraits, primaryComparatorFactories, tokenizerFactory, dataflowHelperFactory,
+                NoOpOperationCallbackFactory.INSTANCE);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp,
+                secondaryPartitionConstraint);
+
+        spec.addRoot(compactOp);
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+}


Mime
View raw message