asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From luoc...@apache.org
Subject [1/5] asterixdb git commit: [ASTERIXDB-1946][STO][IDX]Create RTree/InvertedIdx for Correlated Datasets
Date Tue, 18 Jul 2017 02:04:15 GMT
Repository: asterixdb
Updated Branches:
  refs/heads/master 77109ea4b -> e856e1e8f


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e856e1e8/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java
new file mode 100644
index 0000000..15f8a23
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils;
+
+import org.apache.asterix.common.config.DatasetConfig.IndexType;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.utils.NonTaggedFormatUtil;
+import org.apache.asterix.om.utils.RecordUtil;
+import org.apache.asterix.runtime.formats.FormatUtils;
+import org.apache.asterix.runtime.utils.RuntimeUtils;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy;
+import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
+import org.apache.hyracks.algebricks.data.ITypeTraitProvider;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import org.apache.hyracks.data.std.primitive.ShortPointable;
+import org.apache.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
+import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
+import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
+
+public class SecondaryCorrelatedInvertedIndexOperationsHelper extends SecondaryCorrelatedTreeIndexOperationsHelper {
+
+    private IAType secondaryKeyType;
+    private ITypeTraits[] invListsTypeTraits;
+    private IBinaryComparatorFactory[] tokenComparatorFactories;
+    private ITypeTraits[] tokenTypeTraits;
+    private IBinaryTokenizerFactory tokenizerFactory;
+    // For tokenization, sorting and loading. Represents <token, primary keys>.
+    private int numTokenKeyPairFields;
+    private IBinaryComparatorFactory[] tokenKeyPairComparatorFactories;
+    private RecordDescriptor tokenKeyPairRecDesc;
+    private boolean isPartitioned;
+    private int[] invertedIndexFields;
+    private int[] invertedIndexFieldsForNonBulkLoadOps;
+    private int[] secondaryFilterFieldsForNonBulkLoadOps;
+
+    protected SecondaryCorrelatedInvertedIndexOperationsHelper(Dataset dataset, Index index,
+            PhysicalOptimizationConfig physOptConf, MetadataProvider metadataProvider) throws AlgebricksException {
+        super(dataset, index, physOptConf, metadataProvider);
+    }
+
+    @Override
+    @SuppressWarnings("rawtypes")
+    protected void setSecondaryRecDescAndComparators() throws AlgebricksException {
+        int numSecondaryKeys = index.getKeyFieldNames().size();
+        IndexType indexType = index.getIndexType();
+        boolean isOverridingKeyFieldTypes = index.isOverridingKeyFieldTypes();
+        // Sanity checks.
+        if (numPrimaryKeys > 1) {
+            throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_INDEX_FOR_DATASET_WITH_COMPOSITE_PRIMARY_INDEX,
+                    indexType, RecordUtil.toFullyQualifiedName(dataset.getDataverseName(), dataset.getDatasetName()));
+        }
+        if (numSecondaryKeys > 1) {
+            throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_INDEX_NUM_OF_FIELD, numSecondaryKeys,
+                    indexType, 1);
+        }
+        if (indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX
+                || indexType == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) {
+            isPartitioned = true;
+        } else {
+            isPartitioned = false;
+        }
+        // Prepare record descriptor used in the assign op, and the optional
+        // select op.
+        secondaryFieldAccessEvalFactories = new IScalarEvaluatorFactory[numSecondaryKeys + numFilterFields];
+        ISerializerDeserializer[] secondaryRecFields =
+                new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys + numFilterFields];
+        ISerializerDeserializer[] enforcedRecFields = new ISerializerDeserializer[1 + numPrimaryKeys + numFilterFields];
+        secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys];
+        ITypeTraits[] enforcedTypeTraits = new ITypeTraits[1 + numPrimaryKeys];
+        ISerializerDeserializerProvider serdeProvider = FormatUtils.getDefaultFormat().getSerdeProvider();
+        ITypeTraitProvider typeTraitProvider = FormatUtils.getDefaultFormat().getTypeTraitProvider();
+        int recordColumn = NUM_TAG_FIELDS + numPrimaryKeys;
+        if (numSecondaryKeys > 0) {
+            secondaryFieldAccessEvalFactories[0] = FormatUtils.getDefaultFormat().getFieldAccessEvaluatorFactory(
+                    isOverridingKeyFieldTypes ? enforcedItemType : itemType, index.getKeyFieldNames().get(0),
+                    recordColumn);
+            Pair<IAType, Boolean> keyTypePair = Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(0),
+                    index.getKeyFieldNames().get(0), itemType);
+            secondaryKeyType = keyTypePair.first;
+            anySecondaryKeyIsNullable = anySecondaryKeyIsNullable || keyTypePair.second;
+            ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(secondaryKeyType);
+            secondaryRecFields[0] = keySerde;
+            secondaryTypeTraits[0] = typeTraitProvider.getTypeTrait(secondaryKeyType);
+        }
+        if (numFilterFields > 0) {
+            secondaryFieldAccessEvalFactories[numSecondaryKeys] = FormatUtils.getDefaultFormat()
+                    .getFieldAccessEvaluatorFactory(itemType, filterFieldName, recordColumn);
+            Pair<IAType, Boolean> keyTypePair = Index.getNonNullableKeyFieldType(filterFieldName, itemType);
+            IAType type = keyTypePair.first;
+            ISerializerDeserializer serde = serdeProvider.getSerializerDeserializer(type);
+            secondaryRecFields[numPrimaryKeys + numSecondaryKeys] = serde;
+        }
+        secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
+        // Comparators and type traits for tokens.
+        int numTokenFields = (!isPartitioned) ? numSecondaryKeys : numSecondaryKeys + 1;
+        tokenComparatorFactories = new IBinaryComparatorFactory[numTokenFields];
+        tokenTypeTraits = new ITypeTraits[numTokenFields];
+        tokenComparatorFactories[0] = NonTaggedFormatUtil.getTokenBinaryComparatorFactory(secondaryKeyType);
+        tokenTypeTraits[0] = NonTaggedFormatUtil.getTokenTypeTrait(secondaryKeyType);
+        if (isPartitioned) {
+            // The partitioning field is hardcoded to be a short *without* an Asterix type tag.
+            tokenComparatorFactories[1] = PointableBinaryComparatorFactory.of(ShortPointable.FACTORY);
+            tokenTypeTraits[1] = ShortPointable.TYPE_TRAITS;
+        }
+        // Set tokenizer factory.
+        // TODO: We might want to expose the hashing option at the AQL level,
+        // and add the choice to the index metadata.
+        tokenizerFactory = NonTaggedFormatUtil.getBinaryTokenizerFactory(secondaryKeyType.getTypeTag(), indexType,
+                index.getGramLength());
+        // Type traits for inverted-list elements. Inverted lists contain
+        // primary keys.
+        invListsTypeTraits = new ITypeTraits[numPrimaryKeys];
+        if (numPrimaryKeys > 0) {
+            invListsTypeTraits[0] = primaryRecDesc.getTypeTraits()[0];
+            enforcedRecFields[0] = primaryRecDesc.getFields()[0];
+            enforcedTypeTraits[0] = primaryRecDesc.getTypeTraits()[0];
+        }
+        enforcedRecFields[numPrimaryKeys] = serdeProvider.getSerializerDeserializer(itemType);
+        enforcedRecDesc = new RecordDescriptor(enforcedRecFields, enforcedTypeTraits);
+        // For tokenization, sorting and loading.
+        // One token (+ optional partitioning field) + primary keys.
+        numTokenKeyPairFields = (!isPartitioned) ? 1 + numPrimaryKeys : 2 + numPrimaryKeys;
+        ISerializerDeserializer[] tokenKeyPairFields =
+                new ISerializerDeserializer[numTokenKeyPairFields + numFilterFields];
+        ITypeTraits[] tokenKeyPairTypeTraits = new ITypeTraits[numTokenKeyPairFields];
+        tokenKeyPairComparatorFactories = new IBinaryComparatorFactory[numTokenKeyPairFields];
+        tokenKeyPairFields[0] = serdeProvider.getSerializerDeserializer(secondaryKeyType);
+        tokenKeyPairTypeTraits[0] = tokenTypeTraits[0];
+        tokenKeyPairComparatorFactories[0] = NonTaggedFormatUtil.getTokenBinaryComparatorFactory(secondaryKeyType);
+        int pkOff = 1;
+        if (isPartitioned) {
+            tokenKeyPairFields[1] = ShortSerializerDeserializer.INSTANCE;
+            tokenKeyPairTypeTraits[1] = tokenTypeTraits[1];
+            tokenKeyPairComparatorFactories[1] = PointableBinaryComparatorFactory.of(ShortPointable.FACTORY);
+            pkOff = 2;
+        }
+        if (numPrimaryKeys > 0) {
+            tokenKeyPairFields[pkOff] = primaryRecDesc.getFields()[0];
+            tokenKeyPairTypeTraits[pkOff] = primaryRecDesc.getTypeTraits()[0];
+            tokenKeyPairComparatorFactories[pkOff] = primaryComparatorFactories[0];
+        }
+        if (numFilterFields > 0) {
+            tokenKeyPairFields[numPrimaryKeys + pkOff] = secondaryRecFields[numPrimaryKeys + numSecondaryKeys];
+        }
+        tokenKeyPairRecDesc = new RecordDescriptor(tokenKeyPairFields, tokenKeyPairTypeTraits);
+        if (filterFieldName != null) {
+            invertedIndexFields = new int[numTokenKeyPairFields];
+            for (int i = 0; i < invertedIndexFields.length; i++) {
+                invertedIndexFields[i] = i;
+            }
+            secondaryFilterFieldsForNonBulkLoadOps = new int[numFilterFields];
+            secondaryFilterFieldsForNonBulkLoadOps[0] = numSecondaryKeys + numPrimaryKeys;
+            invertedIndexFieldsForNonBulkLoadOps = new int[numSecondaryKeys + numPrimaryKeys];
+            for (int i = 0; i < invertedIndexFieldsForNonBulkLoadOps.length; i++) {
+                invertedIndexFieldsForNonBulkLoadOps[i] = i;
+            }
+        }
+    }
+
+    @Override
+    protected int getNumSecondaryKeys() {
+        return numTokenKeyPairFields - numPrimaryKeys;
+    }
+
+    @Override
+    public JobSpecification buildLoadingJobSpec() throws AlgebricksException {
+        JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
+        JobId jobId = IndexUtil.bindJobEventListener(spec, metadataProvider);
+
+        // Create dummy key provider for feeding the primary index scan.
+        IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider);
+
+        // Create primary index scan op.
+        IOperatorDescriptor primaryScanOp = createPrimaryIndexScanDiskComponentsOp(spec, metadataProvider,
+                getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider)), jobId);
+
+        IOperatorDescriptor sourceOp = primaryScanOp;
+        boolean isOverridingKeyFieldTypes = index.isOverridingKeyFieldTypes();
+        int numSecondaryKeys = index.getKeyFieldNames().size();
+        if (isOverridingKeyFieldTypes && !enforcedItemType.equals(itemType)) {
+            sourceOp = createCastOp(spec, dataset.getDatasetType(), index.isEnforced());
+            spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, sourceOp, 0);
+        }
+
+        RecordDescriptor taggedSecondaryRecDesc = getTaggedRecordDescriptor(secondaryRecDesc);
+
+        RecordDescriptor taggedTokenKeyPairRecDesc = getTaggedRecordDescriptor(tokenKeyPairRecDesc);
+
+        AlgebricksMetaOperatorDescriptor asterixAssignOp =
+                createAssignOp(spec, numSecondaryKeys, taggedSecondaryRecDesc);
+
+        // Generate compensate tuples for upsert
+        IOperatorDescriptor processorOp =
+                createTupleProcessorOp(spec, taggedSecondaryRecDesc, numSecondaryKeys, numPrimaryKeys, true);
+
+        // Create a tokenizer op.
+        AbstractOperatorDescriptor tokenizerOp = createTokenizerOp(spec);
+
+        // Create a sort op
+        ExternalSortOperatorDescriptor sortOp = createSortOp(spec,
+                getTaggedSecondaryComparatorFactories(tokenKeyPairComparatorFactories), taggedTokenKeyPairRecDesc);
+
+        // Create secondary inverted index bulk load op.
+        AbstractSingleActivityOperatorDescriptor invIndexBulkLoadOp =
+                createTreeIndexBulkLoadOp(spec, metadataProvider, taggedTokenKeyPairRecDesc,
+                        createFieldPermutationForBulkLoadOp(), getNumSecondaryKeys(), numPrimaryKeys, true);
+
+        AlgebricksMetaOperatorDescriptor metaOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0,
+                new IPushRuntimeFactory[] { new SinkRuntimeFactory() }, new RecordDescriptor[] {});
+        // Connect the operators.
+        spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, asterixAssignOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, processorOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), processorOp, 0, tokenizerOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), tokenizerOp, 0, sortOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, invIndexBulkLoadOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), invIndexBulkLoadOp, 0, metaOp, 0);
+        spec.addRoot(metaOp);
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    private AbstractOperatorDescriptor createTokenizerOp(JobSpecification spec) throws AlgebricksException {
+        int docField = NUM_TAG_FIELDS;
+        int numSecondaryKeys = index.getKeyFieldNames().size();
+        int[] keyFields = new int[NUM_TAG_FIELDS + numPrimaryKeys + numFilterFields];
+        // set tag fields
+        for (int i = 0; i < NUM_TAG_FIELDS; i++) {
+            keyFields[i] = i;
+        }
+        // set primary key + filter fields
+        for (int i = NUM_TAG_FIELDS; i < keyFields.length; i++) {
+            keyFields[i] = i + numSecondaryKeys;
+        }
+        BinaryTokenizerOperatorDescriptor tokenizerOp = new BinaryTokenizerOperatorDescriptor(spec,
+                getTaggedRecordDescriptor(tokenKeyPairRecDesc), tokenizerFactory, docField, keyFields, isPartitioned,
+                false, true, MissingWriterFactory.INSTANCE);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, tokenizerOp,
+                primaryPartitionConstraint);
+        return tokenizerOp;
+    }
+
+    @Override
+    protected ExternalSortOperatorDescriptor createSortOp(JobSpecification spec,
+            IBinaryComparatorFactory[] taggedSecondaryComparatorFactories, RecordDescriptor taggedSecondaryRecDesc) {
+        /**
+         * after tokenization, the field layout becomes
+         * [token, num?, tag, primary key, filter value]
+         * we need to sort on
+         * [tag, token, num?, primary key]
+         */
+        int[] taggedSortFields = new int[taggedSecondaryComparatorFactories.length];
+        int numSecondaryKeys = getNumSecondaryKeys();
+        int idx = 0;
+        // set component pos fields
+        taggedSortFields[idx++] = numSecondaryKeys;
+        // set secondary keys
+        for (int i = 0; i < numSecondaryKeys; i++) {
+            taggedSortFields[idx++] = i;
+        }
+        // set primary keys
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            taggedSortFields[idx++] = i + numSecondaryKeys + NUM_TAG_FIELDS;
+        }
+        ExternalSortOperatorDescriptor sortOp =
+                new ExternalSortOperatorDescriptor(spec, physOptConf.getMaxFramesExternalSort(), taggedSortFields,
+                        taggedSecondaryComparatorFactories, taggedSecondaryRecDesc);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp, primaryPartitionConstraint);
+        return sortOp;
+    }
+
+    @Override
+    protected int[] createFieldPermutationForBulkLoadOp() {
+        /**
+         * after tokenization, the field layout becomes
+         * [token, num?, tag, primary key, filter value]
+         * we need to restore it back to
+         * [tag, token, num?, primary key, filter value]
+         */
+        int[] fieldPermutation = new int[NUM_TAG_FIELDS + numTokenKeyPairFields + numFilterFields];
+        int numSecondaryKeys = getNumSecondaryKeys();
+        int idx = 0;
+        // set tag fields
+        for (int i = 0; i < NUM_TAG_FIELDS; i++) {
+            fieldPermutation[idx++] = i + numSecondaryKeys;
+        }
+        // set secondary keys
+        for (int i = 0; i < numSecondaryKeys; i++) {
+            fieldPermutation[idx++] = i;
+        }
+        // set primary key + filter
+        for (int i = 0; i < numPrimaryKeys + numFilterFields; i++) {
+            fieldPermutation[idx++] = i + NUM_TAG_FIELDS + numSecondaryKeys;
+        }
+        return fieldPermutation;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e856e1e8/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java
new file mode 100644
index 0000000..8fd8a7a
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils;
+
+import java.util.List;
+
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.formats.nontagged.TypeTraitProvider;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.utils.NonTaggedFormatUtil;
+import org.apache.asterix.runtime.operators.LSMSecondaryIndexBulkLoadOperatorDescriptor;
+import org.apache.asterix.runtime.utils.RuntimeUtils;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy;
+import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
+
+@SuppressWarnings("rawtypes")
+public class SecondaryCorrelatedRTreeOperationsHelper extends SecondaryCorrelatedTreeIndexOperationsHelper {
+
+    protected IPrimitiveValueProviderFactory[] valueProviderFactories;
+    protected int numNestedSecondaryKeyFields;
+    protected ATypeTag keyType;
+    protected int[] primaryKeyFields;
+    protected int[] rtreeFields;
+    protected boolean isPointMBR;
+    protected RecordDescriptor secondaryRecDescForPointMBR = null;
+
+    protected SecondaryCorrelatedRTreeOperationsHelper(Dataset dataset, Index index,
+            PhysicalOptimizationConfig physOptConf, MetadataProvider metadataProvider) throws AlgebricksException {
+        super(dataset, index, physOptConf, metadataProvider);
+    }
+
+    @Override
+    protected int getNumSecondaryKeys() {
+        return numNestedSecondaryKeyFields;
+    }
+
+    @Override
+    protected void setSecondaryRecDescAndComparators() throws AlgebricksException {
+        List<List<String>> secondaryKeyFields = index.getKeyFieldNames();
+        int numSecondaryKeys = secondaryKeyFields.size();
+        boolean isOverridingKeyFieldTypes = index.isOverridingKeyFieldTypes();
+        if (numSecondaryKeys != 1) {
+            throw AsterixException.create(ErrorCode.INDEX_RTREE_MULTIPLE_FIELDS_NOT_ALLOWED, numSecondaryKeys);
+        }
+        Pair<IAType, Boolean> spatialTypePair =
+                Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(0), secondaryKeyFields.get(0), itemType);
+        IAType spatialType = spatialTypePair.first;
+        anySecondaryKeyIsNullable = spatialTypePair.second;
+        isPointMBR = spatialType.getTypeTag() == ATypeTag.POINT || spatialType.getTypeTag() == ATypeTag.POINT3D;
+        int numDimensions = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
+        numNestedSecondaryKeyFields = numDimensions * 2;
+        int recordColumn = NUM_TAG_FIELDS + numPrimaryKeys;
+        secondaryFieldAccessEvalFactories =
+                metadataProvider.getFormat().createMBRFactory(isOverridingKeyFieldTypes ? enforcedItemType : itemType,
+                        secondaryKeyFields.get(0), recordColumn, numDimensions, filterFieldName, isPointMBR);
+        secondaryComparatorFactories = new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
+        valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
+        ISerializerDeserializer[] secondaryRecFields =
+                new ISerializerDeserializer[numPrimaryKeys + numNestedSecondaryKeyFields + numFilterFields];
+        ISerializerDeserializer[] enforcedRecFields = new ISerializerDeserializer[1 + numPrimaryKeys + numFilterFields];
+        secondaryTypeTraits = new ITypeTraits[numNestedSecondaryKeyFields + numPrimaryKeys];
+        ITypeTraits[] enforcedTypeTraits = new ITypeTraits[1 + numPrimaryKeys];
+        IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
+        keyType = nestedKeyType.getTypeTag();
+        for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
+            ISerializerDeserializer keySerde =
+                    SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(nestedKeyType);
+            secondaryRecFields[i] = keySerde;
+            secondaryComparatorFactories[i] =
+                    BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(nestedKeyType, true);
+            secondaryTypeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
+            valueProviderFactories[i] =
+                    metadataProvider.getStorageComponentProvider().getPrimitiveValueProviderFactory();
+
+        }
+        // Add serializers and comparators for primary index fields.
+        // only support internal datasets
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            secondaryRecFields[numNestedSecondaryKeyFields + i] = primaryRecDesc.getFields()[i];
+            secondaryTypeTraits[numNestedSecondaryKeyFields + i] = primaryRecDesc.getTypeTraits()[i];
+            enforcedRecFields[i] = primaryRecDesc.getFields()[i];
+            enforcedTypeTraits[i] = primaryRecDesc.getTypeTraits()[i];
+        }
+
+        enforcedRecFields[numPrimaryKeys] = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
+        enforcedRecDesc = new RecordDescriptor(enforcedRecFields, enforcedTypeTraits);
+        if (numFilterFields > 0) {
+            rtreeFields = new int[numNestedSecondaryKeyFields + numPrimaryKeys];
+            for (int i = 0; i < rtreeFields.length; i++) {
+                rtreeFields[i] = i;
+            }
+
+            Pair<IAType, Boolean> typePair = Index.getNonNullableKeyFieldType(filterFieldName, itemType);
+            IAType type = typePair.first;
+            ISerializerDeserializer serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(type);
+            secondaryRecFields[numPrimaryKeys + numNestedSecondaryKeyFields] = serde;
+        }
+        secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
+        primaryKeyFields = new int[numPrimaryKeys];
+        for (int i = 0; i < primaryKeyFields.length; i++) {
+            primaryKeyFields[i] = i + numNestedSecondaryKeyFields;
+        }
+        if (isPointMBR) {
+            int numNestedSecondaryKeyFieldForPointMBR = numNestedSecondaryKeyFields / 2;
+            ISerializerDeserializer[] recFieldsForPointMBR = new ISerializerDeserializer[numPrimaryKeys
+                    + numNestedSecondaryKeyFieldForPointMBR + numFilterFields];
+            int idx = 0;
+            for (int i = 0; i < numNestedSecondaryKeyFieldForPointMBR; i++) {
+                recFieldsForPointMBR[idx++] = secondaryRecFields[i];
+            }
+            for (int i = 0; i < numPrimaryKeys + numFilterFields; i++) {
+                recFieldsForPointMBR[idx++] = secondaryRecFields[numNestedSecondaryKeyFields + i];
+            }
+            secondaryRecDescForPointMBR = new RecordDescriptor(recFieldsForPointMBR);
+        }
+    }
+
+    @Override
+    public JobSpecification buildLoadingJobSpec() throws AsterixException, AlgebricksException {
+        /***************************************************
+         * [ About PointMBR Optimization ]
+         * Instead of storing a MBR(4 doubles) for a point(2 doubles) in RTree leaf node,
+         * PointMBR concept is introduced.
+         * PointMBR is a way to store a point as 2 doubles in RTree leaf node.
+         * This reduces RTree index size roughly in half.
+         * In order to fully benefit from the PointMBR concept, besides RTree,
+         * external sort operator during bulk-loading (from either data loading or index creation)
+         * must deal with point as 2 doubles instead of 4 doubles. Otherwise, external sort will suffer from twice as
+         * many doubles as it actually requires. For this purpose,
+         * PointMBR specific optimization logic is added as follows:
+         * 1) CreateMBR function in assign operator generates 2 doubles, instead of 4 doubles.
+         * 2) External sort operator sorts points represented with 2 doubles.
+         * 3) Bulk-loading in RTree takes 4 doubles by reading 2 doubles twice and then,
+         * do the same work as non-point MBR cases.
+         ***************************************************/
+        JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
+        int numNestedSecondaryKeFieldsConsideringPointMBR =
+                isPointMBR ? numNestedSecondaryKeyFields / 2 : numNestedSecondaryKeyFields;
+        RecordDescriptor secondaryRecDescConsideringPointMBR = isPointMBR
+                ? getTaggedRecordDescriptor(secondaryRecDescForPointMBR) : getTaggedRecordDescriptor(secondaryRecDesc);
+
+        boolean isOverridingKeyFieldTypes = index.isOverridingKeyFieldTypes();
+
+        assert dataset.getDatasetType() == DatasetType.INTERNAL;
+
+        // Create dummy key provider for feeding the primary index scan.
+        IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider);
+        JobId jobId = IndexUtil.bindJobEventListener(spec, metadataProvider);
+
+        // Create primary index scan op.
+        IOperatorDescriptor primaryScanOp = createPrimaryIndexScanDiskComponentsOp(spec, metadataProvider,
+                getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider)), jobId);
+
+        // Assign op.
+        IOperatorDescriptor sourceOp = primaryScanOp;
+        if (isOverridingKeyFieldTypes && !enforcedItemType.equals(itemType)) {
+            sourceOp = createCastOp(spec, dataset.getDatasetType(), index.isEnforced());
+            spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, sourceOp, 0);
+        }
+
+        AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec,
+                numNestedSecondaryKeFieldsConsideringPointMBR, secondaryRecDescConsideringPointMBR);
+
+        // Generate compensate tuples for upsert
+        IOperatorDescriptor processorOp = createTupleProcessorOp(spec, secondaryRecDescConsideringPointMBR,
+                numNestedSecondaryKeFieldsConsideringPointMBR, numPrimaryKeys, false);
+
+        ExternalSortOperatorDescriptor sortOp = createSortOp(spec,
+                getTaggedSecondaryComparatorFactories(new IBinaryComparatorFactory[] {
+                        MetadataProvider.proposeLinearizer(keyType, secondaryComparatorFactories.length) }),
+                secondaryRecDescConsideringPointMBR);
+
+        // Create secondary RTree bulk load op.
+        LSMSecondaryIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(spec,
+                metadataProvider, secondaryRecDescConsideringPointMBR, createFieldPermutationForBulkLoadOp(),
+                numNestedSecondaryKeFieldsConsideringPointMBR, numPrimaryKeys, false);
+
+        AlgebricksMetaOperatorDescriptor metaOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0,
+                new IPushRuntimeFactory[] { new SinkRuntimeFactory() }, new RecordDescriptor[] {});
+
+        // Connect the operators.
+        spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, asterixAssignOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, processorOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), processorOp, 0, sortOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), secondaryBulkLoadOp, 0, metaOp, 0);
+        spec.addRoot(metaOp);
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+
+        return spec;
+    }
+
+    @Override
+    protected int[] createFieldPermutationForBulkLoadOp() {
+        if (isPointMBR) {
+            int[] fieldPermutation =
+                    new int[NUM_TAG_FIELDS + numNestedSecondaryKeyFields + numPrimaryKeys + numFilterFields];
+            int idx = 0;
+            int numSecondaryKeyFieldsForPointMBR = numNestedSecondaryKeyFields / 2;
+            for (int i = 0; i < NUM_TAG_FIELDS + numSecondaryKeyFieldsForPointMBR; i++) {
+                fieldPermutation[idx++] = i;
+            }
+            //add the rest of the sk fields for pointMBR
+            for (int i = 0; i < numSecondaryKeyFieldsForPointMBR; i++) {
+                fieldPermutation[idx++] = NUM_TAG_FIELDS + i;
+            }
+            //add the pk and filter fields
+            int end = numSecondaryKeyFieldsForPointMBR + numPrimaryKeys + numFilterFields;
+            for (int i = numSecondaryKeyFieldsForPointMBR; i < end; i++) {
+                fieldPermutation[idx++] = NUM_TAG_FIELDS + i;
+            }
+            return fieldPermutation;
+        } else {
+            return super.createFieldPermutationForBulkLoadOp();
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e856e1e8/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java
index fa33790..171d72e 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java
@@ -263,8 +263,8 @@ public abstract class SecondaryCorrelatedTreeIndexOperationsHelper extends Secon
     }
 
     protected LSMSecondaryIndexBulkLoadOperatorDescriptor createTreeIndexBulkLoadOp(JobSpecification spec,
-            MetadataProvider metadataProvider, RecordDescriptor taggedSecondaryRecDesc, int numSecondaryKeys,
-            int numPrimaryKeys, boolean hasBuddyBtree) throws AlgebricksException {
+            MetadataProvider metadataProvider, RecordDescriptor taggedSecondaryRecDesc, int[] fieldPermutation,
+            int numSecondaryKeys, int numPrimaryKeys, boolean hasBuddyBtree) throws AlgebricksException {
         IndexDataflowHelperFactory primaryIndexHelperFactory = new IndexDataflowHelperFactory(
                 metadataProvider.getStorageComponentProvider().getStorageManager(), primaryFileSplitProvider);
 
@@ -273,7 +273,8 @@ public abstract class SecondaryCorrelatedTreeIndexOperationsHelper extends Secon
 
         LSMSecondaryIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp =
                 new LSMSecondaryIndexBulkLoadOperatorDescriptor(spec, taggedSecondaryRecDesc, primaryIndexHelperFactory,
-                        secondaryIndexHelperFactory, NUM_TAG_FIELDS, numSecondaryKeys, numPrimaryKeys, hasBuddyBtree);
+                        secondaryIndexHelperFactory, fieldPermutation, NUM_TAG_FIELDS, numSecondaryKeys,
+                        numPrimaryKeys, hasBuddyBtree);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp,
                 secondaryPartitionConstraint);
         return treeIndexBulkLoadOp;
@@ -306,14 +307,16 @@ public abstract class SecondaryCorrelatedTreeIndexOperationsHelper extends Secon
                         new SecondaryCorrelatedBTreeOperationsHelper(dataset, index, physOptConf, metadataProvider);
                 break;
             case RTREE:
-                //TODO RTree
+                indexOperationsHelper =
+                        new SecondaryCorrelatedRTreeOperationsHelper(dataset, index, physOptConf, metadataProvider);
+                break;
             case SINGLE_PARTITION_WORD_INVIX:
             case SINGLE_PARTITION_NGRAM_INVIX:
             case LENGTH_PARTITIONED_WORD_INVIX:
             case LENGTH_PARTITIONED_NGRAM_INVIX:
-                //TODO Inverted Index
-                //TODO This will be fixed soon
-                throw new UnsupportedOperationException();
+                indexOperationsHelper = new SecondaryCorrelatedInvertedIndexOperationsHelper(dataset, index,
+                        physOptConf, metadataProvider);
+                break;
             default:
                 throw new CompilationException(ErrorCode.COMPILATION_UNKNOWN_INDEX_TYPE, index.getIndexType());
         }
@@ -321,4 +324,12 @@ public abstract class SecondaryCorrelatedTreeIndexOperationsHelper extends Secon
         return indexOperationsHelper;
     }
 
+    protected int[] createFieldPermutationForBulkLoadOp() {
+        int[] fieldPermutation = new int[NUM_TAG_FIELDS + getNumSecondaryKeys() + numPrimaryKeys + numFilterFields];
+        for (int i = 0; i < fieldPermutation.length; i++) {
+            fieldPermutation[i] = i;
+        }
+        return fieldPermutation;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e856e1e8/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
index cb15b98..b4d8a22 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
@@ -23,6 +23,7 @@ import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Index;
@@ -213,8 +214,8 @@ public class SecondaryInvertedIndexOperationsHelper extends SecondaryTreeIndexOp
         IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider);
 
         // Create primary index scan op.
-        IOperatorDescriptor primaryScanOp = DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset,
-                jobId);
+        IOperatorDescriptor primaryScanOp =
+                DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset, jobId);
 
         IOperatorDescriptor sourceOp = primaryScanOp;
         boolean isOverridingKeyFieldTypes = index.isOverridingKeyFieldTypes();
@@ -270,8 +271,9 @@ public class SecondaryInvertedIndexOperationsHelper extends SecondaryTreeIndexOp
         for (int i = 0; i < primaryKeyFields.length; i++) {
             primaryKeyFields[i] = numSecondaryKeys + i;
         }
-        BinaryTokenizerOperatorDescriptor tokenizerOp = new BinaryTokenizerOperatorDescriptor(spec, tokenKeyPairRecDesc,
-                tokenizerFactory, docField, primaryKeyFields, isPartitioned, false);
+        BinaryTokenizerOperatorDescriptor tokenizerOp =
+                new BinaryTokenizerOperatorDescriptor(spec, tokenKeyPairRecDesc, tokenizerFactory, docField,
+                        primaryKeyFields, isPartitioned, false, false, MissingWriterFactory.INSTANCE);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, tokenizerOp,
                 primaryPartitionConstraint);
         return tokenizerOp;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e856e1e8/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AbstractLSMSecondaryIndexCreationNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AbstractLSMSecondaryIndexCreationNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AbstractLSMSecondaryIndexCreationNodePushable.java
index 6b050af..0d71f71 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AbstractLSMSecondaryIndexCreationNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AbstractLSMSecondaryIndexCreationNodePushable.java
@@ -24,7 +24,6 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.primitive.BooleanPointable;
 import org.apache.hyracks.data.std.primitive.IntegerPointable;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 
@@ -33,9 +32,6 @@ public abstract class AbstractLSMSecondaryIndexCreationNodePushable
     protected final IHyracksTaskContext ctx;
     protected final RecordDescriptor inputRecDesc;
 
-    // with tag fields
-    protected final FrameTupleReference tuple;
-
     protected final int partition;
     protected final int numTagFields;
     protected final int numSecondaryKeys;
@@ -50,7 +46,6 @@ public abstract class AbstractLSMSecondaryIndexCreationNodePushable
             boolean hasBuddyBTree) {
         this.ctx = ctx;
         this.inputRecDesc = inputRecDesc;
-        this.tuple = new FrameTupleReference();
 
         this.partition = partition;
         this.numTagFields = numTagFields;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e856e1e8/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
index f09bcd2..2eb84fb 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
@@ -29,6 +29,7 @@ import org.apache.hyracks.data.std.primitive.LongPointable;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
 import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
@@ -45,7 +46,7 @@ import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
  */
 public class LSMSecondaryIndexBulkLoadNodePushable extends AbstractLSMSecondaryIndexCreationNodePushable {
     // with tag fields
-
+    private final PermutingFrameTupleReference tuple;
     private final PermutingTupleReference sourceTuple;
     private final PermutingTupleReference deletedKeyTuple;
 
@@ -63,16 +64,17 @@ public class LSMSecondaryIndexBulkLoadNodePushable extends AbstractLSMSecondaryI
 
     public LSMSecondaryIndexBulkLoadNodePushable(IHyracksTaskContext ctx, int partition, RecordDescriptor inputRecDesc,
             IIndexDataflowHelperFactory primaryIndexHelperFactory,
-            IIndexDataflowHelperFactory secondaryIndexHelperFactory, int numTagFields, int numSecondaryKeys,
-            int numPrimaryKeys, boolean hasBuddyBTree) throws HyracksDataException {
+            IIndexDataflowHelperFactory secondaryIndexHelperFactory, int[] fieldPermutation, int numTagFields,
+            int numSecondaryKeys, int numPrimaryKeys, boolean hasBuddyBTree) throws HyracksDataException {
         super(ctx, partition, inputRecDesc, numTagFields, numSecondaryKeys, numPrimaryKeys, hasBuddyBTree);
 
         this.primaryIndexHelper =
                 primaryIndexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
         this.secondaryIndexHelper =
                 secondaryIndexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
+        this.tuple = new PermutingFrameTupleReference(fieldPermutation);
 
-        int[] sourcePermutation = new int[inputRecDesc.getFieldCount() - numTagFields];
+        int[] sourcePermutation = new int[fieldPermutation.length - numTagFields];
         for (int i = 0; i < sourcePermutation.length; i++) {
             sourcePermutation[i] = i + numTagFields;
         }
@@ -170,7 +172,6 @@ public class LSMSecondaryIndexBulkLoadNodePushable extends AbstractLSMSecondaryI
                     loadNewComponent(componentPos);
                     currentComponentPos = componentPos;
                 }
-
                 if (isAntiMatterTuple(tuple)) {
                     addAntiMatterTuple(tuple);
                 } else {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e856e1e8/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadOperatorDescriptor.java
index 804b700..4f1dfd7 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadOperatorDescriptor.java
@@ -35,6 +35,7 @@ public class LSMSecondaryIndexBulkLoadOperatorDescriptor extends AbstractSingleA
     private final IIndexDataflowHelperFactory primaryIndexHelperFactory;
     private final IIndexDataflowHelperFactory secondaryIndexHelperFactory;
 
+    private final int[] fieldPermutation;
     private final int numTagFields;
     private final int numSecondaryKeys;
     private final int numPrimaryKeys;
@@ -43,12 +44,13 @@ public class LSMSecondaryIndexBulkLoadOperatorDescriptor extends AbstractSingleA
 
     public LSMSecondaryIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
             IIndexDataflowHelperFactory primaryIndexHelperFactory,
-            IIndexDataflowHelperFactory secondaryIndexHelperFactory, int numTagFields, int numSecondaryKeys,
-            int numPrimaryKeys, boolean hasBuddyBTree) {
+            IIndexDataflowHelperFactory secondaryIndexHelperFactory, int[] fieldPermutation, int numTagFields,
+            int numSecondaryKeys, int numPrimaryKeys, boolean hasBuddyBTree) {
         super(spec, 1, 1);
         this.outRecDescs[0] = outRecDesc;
         this.primaryIndexHelperFactory = primaryIndexHelperFactory;
         this.secondaryIndexHelperFactory = secondaryIndexHelperFactory;
+        this.fieldPermutation = fieldPermutation;
         this.numTagFields = numTagFields;
         this.numSecondaryKeys = numSecondaryKeys;
         this.numPrimaryKeys = numPrimaryKeys;
@@ -61,6 +63,7 @@ public class LSMSecondaryIndexBulkLoadOperatorDescriptor extends AbstractSingleA
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
         return new LSMSecondaryIndexBulkLoadNodePushable(ctx, partition,
                 recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), primaryIndexHelperFactory,
-                secondaryIndexHelperFactory, numTagFields, numSecondaryKeys, numPrimaryKeys, hasBuddyBTree);
+                secondaryIndexHelperFactory, fieldPermutation, numTagFields, numSecondaryKeys, numPrimaryKeys,
+                hasBuddyBTree);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e856e1e8/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorNodePushable.java
index b2a2fa4..9376d1b 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorNodePushable.java
@@ -36,6 +36,7 @@ import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.dataflow.common.data.marshalling.BooleanSerializerDeserializer;
 import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
@@ -69,6 +70,7 @@ import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
  * [component pos, anti-matter flag, secondary keys, primary keys, filter values]
  */
 public class LSMSecondaryIndexCreationTupleProcessorNodePushable extends AbstractLSMSecondaryIndexCreationNodePushable {
+    private final FrameTupleReference tuple = new FrameTupleReference();
     // prevSourceTuple stores the previous matter tuple
     private final ArrayTupleBuilder prevMatterTupleBuilder;
     private final ArrayTupleReference prevMatterTuple = new ArrayTupleReference();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e856e1e8/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java
index fe9decd..3ed47e6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java
@@ -21,6 +21,7 @@ package org.apache.hyracks.storage.am.lsm.invertedindex.dataflow;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -47,8 +48,13 @@ public class BinaryTokenizerOperatorDescriptor extends AbstractSingleActivityOpe
     // False: [token, number of token(if a partitioned index), keyfield1, keyfield2 ...]
     private final boolean writeKeyFieldsFirst;
 
+    private final boolean writeMissing;
+
+    private final IMissingWriterFactory missingWriterFactory;
+
     public BinaryTokenizerOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc,
-            IBinaryTokenizerFactory tokenizerFactory, int docField, int[] keyFields, boolean addNumTokensKey, boolean writeKeyFieldsFirst) {
+            IBinaryTokenizerFactory tokenizerFactory, int docField, int[] keyFields, boolean addNumTokensKey,
+            boolean writeKeyFieldsFirst, boolean writeMissing, IMissingWriterFactory missingWriterFactory) {
         super(spec, 1, 1);
         this.tokenizerFactory = tokenizerFactory;
         this.docField = docField;
@@ -56,13 +62,16 @@ public class BinaryTokenizerOperatorDescriptor extends AbstractSingleActivityOpe
         this.addNumTokensKey = addNumTokensKey;
         outRecDescs[0] = recDesc;
         this.writeKeyFieldsFirst = writeKeyFieldsFirst;
+        this.writeMissing = writeMissing;
+        this.missingWriterFactory = missingWriterFactory;
     }
 
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
-        return new BinaryTokenizerOperatorNodePushable(ctx, recordDescProvider.getInputRecordDescriptor(
-                getActivityId(), 0), outRecDescs[0], tokenizerFactory.createTokenizer(), docField, keyFields,
-                addNumTokensKey, writeKeyFieldsFirst);
+        return new BinaryTokenizerOperatorNodePushable(ctx,
+                recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), outRecDescs[0],
+                tokenizerFactory.createTokenizer(), docField, keyFields, addNumTokensKey, writeKeyFieldsFirst,
+                writeMissing, missingWriterFactory);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e856e1e8/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
index 002457b..3df185a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
@@ -24,6 +24,8 @@ import java.nio.ByteBuffer;
 
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IMissingWriter;
+import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.util.GrowableArray;
@@ -31,6 +33,8 @@ import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizer;
 import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IToken;
@@ -45,6 +49,9 @@ public class BinaryTokenizerOperatorNodePushable extends AbstractUnaryInputUnary
     private final boolean writeKeyFieldsFirst;
     private final RecordDescriptor inputRecDesc;
     private final RecordDescriptor outputRecDesc;
+    private final boolean writeMissing;
+    private final IMissingWriter missingWriter;
+    private final FrameTupleReference tuple = new FrameTupleReference();
 
     private FrameTupleAccessor accessor;
     private ArrayTupleBuilder builder;
@@ -53,7 +60,8 @@ public class BinaryTokenizerOperatorNodePushable extends AbstractUnaryInputUnary
 
     public BinaryTokenizerOperatorNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc,
             RecordDescriptor outputRecDesc, IBinaryTokenizer tokenizer, int docField, int[] keyFields,
-            boolean addNumTokensKey, boolean writeKeyFieldsFirst) {
+            boolean addNumTokensKey, boolean writeKeyFieldsFirst, boolean writeMissing,
+            IMissingWriterFactory missingWriterFactory) {
         this.ctx = ctx;
         this.tokenizer = tokenizer;
         this.docField = docField;
@@ -62,6 +70,8 @@ public class BinaryTokenizerOperatorNodePushable extends AbstractUnaryInputUnary
         this.inputRecDesc = inputRecDesc;
         this.outputRecDesc = outputRecDesc;
         this.writeKeyFieldsFirst = writeKeyFieldsFirst;
+        this.writeMissing = writeMissing;
+        this.missingWriter = missingWriterFactory.createMissingWriter();
     }
 
     @Override
@@ -79,76 +89,85 @@ public class BinaryTokenizerOperatorNodePushable extends AbstractUnaryInputUnary
         int tupleCount = accessor.getTupleCount();
 
         for (int i = 0; i < tupleCount; i++) {
-            short numTokens = 0;
+            tuple.reset(accessor, i);
 
-            tokenizer.reset(accessor.getBuffer().array(), accessor.getTupleStartOffset(i)
-                    + accessor.getFieldSlotsLength() + accessor.getFieldStartOffset(i, docField),
-                    accessor.getFieldLength(i, docField));
+            short numTokens = 0;
 
-            if (addNumTokensKey) {
-                // Get the total number of tokens.
-                numTokens = tokenizer.getTokensCount();
+            if (!isDocFieldMissing(tuple)) {
+                tokenizer.reset(tuple.getFieldData(docField), tuple.getFieldStart(docField),
+                        tuple.getFieldLength(docField));
+                if (addNumTokensKey) {
+                    // Get the total number of tokens.
+                    numTokens = tokenizer.getTokensCount();
+                }
+                // Write token and data into frame by following the order specified
+                // in the writeKeyFieldsFirst field.
+                while (tokenizer.hasNext()) {
+                    tokenizer.next();
+                    IToken token = tokenizer.getToken();
+                    writeTuple(token, numTokens, i);
+                }
+            } else if (writeMissing) {
+                writeTuple(null, 0, i);
             }
+        }
 
-            // Write token and data into frame by following the order specified
-            // in the writeKeyFieldsFirst field.
-            while (tokenizer.hasNext()) {
-
-                tokenizer.next();
-
-                builder.reset();
-
-                // Writing Order: token, number of token, keyfield1 ... n
-                if (!writeKeyFieldsFirst) {
-                    try {
-                        IToken token = tokenizer.getToken();
-                        token.serializeToken(builderData);
-
-                        builder.addFieldEndOffset();
-                        // Add number of tokens if requested.
-                        if (addNumTokensKey) {
-                            builder.getDataOutput().writeShort(numTokens);
-                            builder.addFieldEndOffset();
-                        }
-                    } catch (IOException e) {
-                        throw new HyracksDataException(e.getMessage());
-                    }
-
-                    for (int k = 0; k < keyFields.length; k++) {
-                        builder.addField(accessor, i, keyFields[k]);
-                    }
+    }
 
+    private void writeTuple(IToken token, int numTokens, int fieldIdx) throws HyracksDataException {
+        builder.reset();
+
+        // Writing Order: token, number of token, keyfield1 ... n
+        if (!writeKeyFieldsFirst) {
+            try {
+                if (token != null) {
+                    token.serializeToken(builderData);
+                    builder.addFieldEndOffset();
+                } else {
+                    missingWriter.writeMissing(builder.getDataOutput());
+                    builder.addFieldEndOffset();
                 }
-                // Writing Order: keyfield1 ... n, token, number of token
-                else {
-
-                    for (int k = 0; k < keyFields.length; k++) {
-                        builder.addField(accessor, i, keyFields[k]);
-                    }
-
-                    try {
-                        IToken token = tokenizer.getToken();
-                        token.serializeToken(builderData);
-
-                        builder.addFieldEndOffset();
-                        // Add number of tokens if requested.
-                        if (addNumTokensKey) {
-                            builder.getDataOutput().writeShort(numTokens);
-                            builder.addFieldEndOffset();
-                        }
-                    } catch (IOException e) {
-                        throw new HyracksDataException(e.getMessage());
-                    }
 
+                // Add number of tokens if requested.
+                if (addNumTokensKey) {
+                    builder.getDataOutput().writeShort(numTokens);
+                    builder.addFieldEndOffset();
                 }
+            } catch (IOException e) {
+                throw HyracksDataException.create(e);
+            }
 
-                FrameUtils.appendToWriter(writer, appender, builder.getFieldEndOffsets(), builder.getByteArray(), 0,
-                        builder.getSize());
+            for (int k = 0; k < keyFields.length; k++) {
+                builder.addField(accessor, fieldIdx, keyFields[k]);
+            }
 
+        }
+        // Writing Order: keyfield1 ... n, token, number of token
+        else {
+            for (int k = 0; k < keyFields.length; k++) {
+                builder.addField(accessor, fieldIdx, keyFields[k]);
             }
 
+            try {
+                if (token != null) {
+                    token.serializeToken(builderData);
+                    builder.addFieldEndOffset();
+                } else {
+                    missingWriter.writeMissing(builder.getDataOutput());
+                    builder.addFieldEndOffset();
+                }
+                // Add number of tokens if requested.
+                if (addNumTokensKey) {
+                    builder.getDataOutput().writeShort(numTokens);
+                    builder.addFieldEndOffset();
+                }
+            } catch (IOException e) {
+                throw HyracksDataException.create(e);
+            }
         }
 
+        FrameUtils.appendToWriter(writer, appender, builder.getFieldEndOffsets(), builder.getByteArray(), 0,
+                builder.getSize());
     }
 
     @Override
@@ -169,4 +188,14 @@ public class BinaryTokenizerOperatorNodePushable extends AbstractUnaryInputUnary
     public void flush() throws HyracksDataException {
         appender.flush(writer);
     }
+
+    /**
+     * Returns whether the doc field is missing (only with a type tag)
+     *
+     * @param tuple
+     * @return
+     */
+    private boolean isDocFieldMissing(ITupleReference tuple) {
+        return tuple.getFieldLength(docField) <= 1;
+    }
 }


Mime
View raw message