asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ti...@apache.org
Subject [4/7] asterixdb git commit: ASTERIXDB-1711: remove some Aql-prefixes
Date Thu, 24 Nov 2016 00:57:31 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42600592/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
deleted file mode 100644
index f6eeb66..0000000
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
+++ /dev/null
@@ -1,2255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.declared;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.common.config.AsterixStorageProperties;
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
-import org.apache.asterix.common.config.DatasetConfig.IndexType;
-import org.apache.asterix.common.config.GlobalConfig;
-import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
-import org.apache.asterix.common.context.ITransactionSubsystemProvider;
-import org.apache.asterix.common.context.TransactionSubsystemProvider;
-import org.apache.asterix.common.dataflow.AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor;
-import org.apache.asterix.common.dataflow.AsterixLSMTreeInsertDeleteOperatorDescriptor;
-import org.apache.asterix.common.dataflow.IAsterixApplicationContextInfo;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
-import org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory;
-import org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
-import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
-import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
-import org.apache.asterix.common.transactions.JobId;
-import org.apache.asterix.common.utils.StoragePathUtil;
-import org.apache.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
-import org.apache.asterix.external.adapter.factory.LookupAdapterFactory;
-import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.asterix.external.api.IDataSourceAdapter;
-import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.external.indexing.IndexingConstants;
-import org.apache.asterix.external.operators.ExternalBTreeSearchOperatorDescriptor;
-import org.apache.asterix.external.operators.ExternalDataScanOperatorDescriptor;
-import org.apache.asterix.external.operators.ExternalLookupOperatorDescriptor;
-import org.apache.asterix.external.operators.ExternalRTreeSearchOperatorDescriptor;
-import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor;
-import org.apache.asterix.external.provider.AdapterFactoryProvider;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.FeedConstants;
-import org.apache.asterix.formats.base.IDataFormat;
-import org.apache.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
-import org.apache.asterix.formats.nontagged.AqlLinearizeComparatorFactoryProvider;
-import org.apache.asterix.formats.nontagged.AqlTypeTraitProvider;
-import org.apache.asterix.metadata.MetadataException;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.dataset.hints.DatasetHints.DatasetCardinalityHint;
-import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.metadata.entities.DatasourceAdapter;
-import org.apache.asterix.metadata.entities.Dataverse;
-import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
-import org.apache.asterix.metadata.entities.Feed;
-import org.apache.asterix.metadata.entities.FeedPolicyEntity;
-import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.entities.InternalDatasetDetails;
-import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
-import org.apache.asterix.metadata.utils.DatasetUtils;
-import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
-import org.apache.asterix.metadata.utils.MetadataConstants;
-import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
-import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.NonTaggedFormatUtil;
-import org.apache.asterix.runtime.base.AsterixTupleFilterFactory;
-import org.apache.asterix.runtime.formats.FormatUtils;
-import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
-import org.apache.asterix.runtime.operators.AsterixLSMInvertedIndexUpsertOperatorDescriptor;
-import org.apache.asterix.runtime.operators.AsterixLSMTreeUpsertOperatorDescriptor;
-import org.apache.asterix.runtime.util.AsterixAppContextInfo;
-import org.apache.asterix.runtime.util.AsterixRuntimeComponentsProvider;
-import org.apache.asterix.runtime.util.ClusterStateManager;
-import org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallbackFactory;
-import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
-import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
-import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
-import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallbackFactory;
-import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
-import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexSearchOperationCallbackFactory;
-import org.apache.asterix.transaction.management.opcallbacks.TempDatasetPrimaryIndexModificationOperationCallbackFactory;
-import org.apache.asterix.transaction.management.opcallbacks.TempDatasetSecondaryIndexModificationOperationCallbackFactory;
-import org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallbackFactory;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.common.utils.Triple;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
-import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSink;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
-import org.apache.hyracks.algebricks.data.IAWriterFactory;
-import org.apache.hyracks.algebricks.data.IPrinterFactory;
-import org.apache.hyracks.algebricks.data.IResultSerializerFactoryProvider;
-import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
-import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.IResultSerializerFactory;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.ITypeTraits;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.dataset.ResultSetId;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
-import org.apache.hyracks.data.std.primitive.ShortPointable;
-import org.apache.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
-import org.apache.hyracks.dataflow.std.file.FileSplit;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
-import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
-import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
-import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
-import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
-import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
-import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
-import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
-import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
-import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeDataflowHelperFactory;
-import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeWithBuddyDataflowHelperFactory;
-import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
-import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
-import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexBulkLoadOperatorDescriptor;
-import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexDataflowHelperFactory;
-import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.PartitionedLSMInvertedIndexDataflowHelperFactory;
-import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
-import org.apache.hyracks.storage.am.lsm.rtree.dataflow.ExternalRTreeDataflowHelperFactory;
-import org.apache.hyracks.storage.am.lsm.rtree.dataflow.LSMRTreeWithAntiMatterTuplesDataflowHelperFactory;
-import org.apache.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
-import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType;
-
-public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, String> {
-
-    private final AsterixStorageProperties storageProperties;
-    private final ILibraryManager libraryManager;
-    private final Dataverse defaultDataverse;
-
-    private MetadataTransactionContext mdTxnCtx;
-    private boolean isWriteTransaction;
-    private Map<String, String> config;
-    private IAWriterFactory writerFactory;
-    private FileSplit outputFile;
-    private boolean asyncResults;
-    private ResultSetId resultSetId;
-    private IResultSerializerFactoryProvider resultSerializerFactoryProvider;
-    private JobId jobId;
-    private Map<String, Integer> locks;
-    private boolean isTemporaryDatasetWriteJob = true;
-
-    public AqlMetadataProvider(Dataverse defaultDataverse) {
-        this.defaultDataverse = defaultDataverse;
-        this.storageProperties = AsterixAppContextInfo.INSTANCE.getStorageProperties();
-        this.libraryManager = AsterixAppContextInfo.INSTANCE.getLibraryManager();
-    }
-
-    public String getPropertyValue(String propertyName) {
-        return config.get(propertyName);
-    }
-
-    public void setConfig(Map<String, String> config) {
-        this.config = config;
-    }
-
-    public Map<String, String> getConfig() {
-        return config;
-    }
-
-    public ILibraryManager getLibraryManager() {
-        return libraryManager;
-    }
-
-    public void setJobId(JobId jobId) {
-        this.jobId = jobId;
-    }
-
-    public Dataverse getDefaultDataverse() {
-        return defaultDataverse;
-    }
-
-    public String getDefaultDataverseName() {
-        return defaultDataverse == null ? null : defaultDataverse.getDataverseName();
-    }
-
-    public void setWriteTransaction(boolean writeTransaction) {
-        this.isWriteTransaction = writeTransaction;
-    }
-
-    public void setWriterFactory(IAWriterFactory writerFactory) {
-        this.writerFactory = writerFactory;
-    }
-
-    public void setMetadataTxnContext(MetadataTransactionContext mdTxnCtx) {
-        this.mdTxnCtx = mdTxnCtx;
-    }
-
-    public MetadataTransactionContext getMetadataTxnContext() {
-        return mdTxnCtx;
-    }
-
-    public IAWriterFactory getWriterFactory() {
-        return this.writerFactory;
-    }
-
-    public FileSplit getOutputFile() {
-        return outputFile;
-    }
-
-    public void setOutputFile(FileSplit outputFile) {
-        this.outputFile = outputFile;
-    }
-
-    public boolean getResultAsyncMode() {
-        return asyncResults;
-    }
-
-    public void setResultAsyncMode(boolean asyncResults) {
-        this.asyncResults = asyncResults;
-    }
-
-    public ResultSetId getResultSetId() {
-        return resultSetId;
-    }
-
-    public void setResultSetId(ResultSetId resultSetId) {
-        this.resultSetId = resultSetId;
-    }
-
-    public void setResultSerializerFactoryProvider(IResultSerializerFactoryProvider rafp) {
-        this.resultSerializerFactoryProvider = rafp;
-    }
-
-    public IResultSerializerFactoryProvider getResultSerializerFactoryProvider() {
-        return resultSerializerFactoryProvider;
-    }
-
-    public boolean isWriteTransaction() {
-        // The transaction writes persistent datasets.
-        return isWriteTransaction;
-    }
-
-    public boolean isTemporaryDatasetWriteJob() {
-        // The transaction only writes temporary datasets.
-        return isTemporaryDatasetWriteJob;
-    }
-
-    public IDataFormat getFormat() {
-        return FormatUtils.getDefaultFormat();
-    }
-
-    public AsterixStorageProperties getStorageProperties() {
-        return storageProperties;
-    }
-
-    public Map<String, Integer> getLocks() {
-        return locks;
-    }
-
-    public void setLocks(Map<String, Integer> locks) {
-        this.locks = locks;
-    }
-
-    /**
-     * Retrieve the Output RecordType, as defined by "set output-record-type".
-     */
-    public ARecordType findOutputRecordType() throws AlgebricksException {
-        return MetadataManagerUtil.findOutputRecordType(mdTxnCtx, getDefaultDataverseName(),
-                getPropertyValue("output-record-type"));
-    }
-
-    public Dataset findDataset(String dataverse, String dataset) throws AlgebricksException {
-        String dv = dataverse == null ? (defaultDataverse == null ? null : defaultDataverse.getDataverseName())
-                : dataverse;
-        if (dv == null) {
-            return null;
-        }
-        return MetadataManagerUtil.findDataset(mdTxnCtx, dv, dataset);
-    }
-
-    public INodeDomain findNodeDomain(String nodeGroupName) throws AlgebricksException {
-        return MetadataManagerUtil.findNodeDomain(mdTxnCtx, nodeGroupName);
-    }
-
-    public IAType findType(String dataverse, String typeName) throws AlgebricksException {
-        return MetadataManagerUtil.findType(mdTxnCtx, dataverse, typeName);
-    }
-
-    public Feed findFeed(String dataverse, String feedName) throws AlgebricksException {
-        return MetadataManagerUtil.findFeed(mdTxnCtx, dataverse, feedName);
-    }
-
-    public FeedPolicyEntity findFeedPolicy(String dataverse, String policyName) throws AlgebricksException {
-        return MetadataManagerUtil.findFeedPolicy(mdTxnCtx, dataverse, policyName);
-    }
-
-    @Override
-    public AqlDataSource findDataSource(AqlSourceId id) throws AlgebricksException {
-        return MetadataManagerUtil.findDataSource(mdTxnCtx, id);
-    }
-
-    public AqlDataSource lookupSourceInMetadata(AqlSourceId aqlId) throws AlgebricksException {
-        return MetadataManagerUtil.lookupSourceInMetadata(mdTxnCtx, aqlId);
-    }
-
-    @Override
-    public IDataSourceIndex<String, AqlSourceId> findDataSourceIndex(String indexId, AqlSourceId dataSourceId)
-            throws AlgebricksException {
-        AqlDataSource ads = findDataSource(dataSourceId);
-        Dataset dataset = ((DatasetDataSource) ads).getDataset();
-        try {
-            String indexName = indexId;
-            Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                    dataset.getDatasetName(), indexName);
-            if (secondaryIndex != null) {
-                return new AqlIndex(secondaryIndex, dataset.getDataverseName(), dataset.getDatasetName(), this);
-            } else {
-                Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                        dataset.getDatasetName(), dataset.getDatasetName());
-                if (primaryIndex.getIndexName().equals(indexId)) {
-                    return new AqlIndex(primaryIndex, dataset.getDataverseName(), dataset.getDatasetName(), this);
-                } else {
-                    return null;
-                }
-            }
-        } catch (MetadataException me) {
-            throw new AlgebricksException(me);
-        }
-    }
-
-    public List<Index> getDatasetIndexes(String dataverseName, String datasetName) throws AlgebricksException {
-        return MetadataManagerUtil.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
-    }
-
-    @Override
-    public IFunctionInfo lookupFunction(FunctionIdentifier fid) {
-        return AsterixBuiltinFunctions.lookupFunction(fid);
-    }
-
-    @Override
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(
-            IDataSource<AqlSourceId> dataSource, List<LogicalVariable> scanVariables,
-            List<LogicalVariable> projectVariables, boolean projectPushed, List<LogicalVariable> minFilterVars,
-            List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
-            JobGenContext context, JobSpecification jobSpec, Object implConfig) throws AlgebricksException {
-        try {
-            return ((AqlDataSource) dataSource).buildDatasourceScanRuntime(this, dataSource, scanVariables,
-                    projectVariables, projectPushed, minFilterVars, maxFilterVars, opSchema, typeEnv, context, jobSpec,
-                    implConfig);
-        } catch (AsterixException e) {
-            throw new AlgebricksException(e);
-        }
-    }
-
-    public static AlgebricksAbsolutePartitionConstraint determineLocationConstraint(FeedDataSource feedDataSource) {
-        return new AlgebricksAbsolutePartitionConstraint(feedDataSource.getLocations());
-    }
-
-    protected Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildLoadableDatasetScan(
-            JobSpecification jobSpec, IAdapterFactory adapterFactory, RecordDescriptor rDesc)
-            throws AlgebricksException {
-        ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(jobSpec, rDesc,
-                adapterFactory);
-        AlgebricksPartitionConstraint constraint;
-        try {
-            constraint = adapterFactory.getPartitionConstraint();
-        } catch (Exception e) {
-            throw new AlgebricksException(e);
-        }
-        return new Pair<>(dataScanner, constraint);
-    }
-
-    public IDataFormat getDataFormat(String dataverseName) throws AsterixException {
-        Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
-        IDataFormat format;
-        try {
-            format = (IDataFormat) Class.forName(dataverse.getDataFormat()).newInstance();
-        } catch (Exception e) {
-            throw new AsterixException(e);
-        }
-        return format;
-    }
-
-    public Dataverse findDataverse(String dataverseName) throws AsterixException {
-        return MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
-    }
-
-    public Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory> buildFeedIntakeRuntime(
-            JobSpecification jobSpec, Feed primaryFeed, FeedPolicyAccessor policyAccessor) throws Exception {
-        Triple<IAdapterFactory, RecordDescriptor, IDataSourceAdapter.AdapterType> factoryOutput;
-        factoryOutput = FeedMetadataUtil.getPrimaryFeedFactoryAndOutput(primaryFeed, policyAccessor, mdTxnCtx,
-                libraryManager);
-        ARecordType recordType = FeedMetadataUtil.getOutputType(primaryFeed, primaryFeed.getAdapterConfiguration(),
-                ExternalDataConstants.KEY_TYPE_NAME);
-        IAdapterFactory adapterFactory = factoryOutput.first;
-        FeedIntakeOperatorDescriptor feedIngestor = null;
-        switch (factoryOutput.third) {
-            case INTERNAL:
-                feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, primaryFeed, adapterFactory, recordType,
-                        policyAccessor, factoryOutput.second);
-                break;
-            case EXTERNAL:
-                String libraryName = primaryFeed.getAdapterName().trim()
-                        .split(FeedConstants.NamingConstants.LIBRARY_NAME_SEPARATOR)[0];
-                feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, primaryFeed, libraryName,
-                        adapterFactory.getClass().getName(), recordType, policyAccessor, factoryOutput.second);
-                break;
-        }
-
-        AlgebricksPartitionConstraint partitionConstraint = adapterFactory.getPartitionConstraint();
-        return new Triple<>(feedIngestor, partitionConstraint, adapterFactory);
-    }
-
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildBtreeRuntime(JobSpecification jobSpec,
-            List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
-            JobGenContext context, boolean retainInput, boolean retainMissing, Dataset dataset, String indexName,
-            int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive,
-            Object implConfig, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes) throws AlgebricksException {
-        boolean isSecondary = true;
-        int numSecondaryKeys = 0;
-        try {
-            boolean temp = dataset.getDatasetDetails().isTemp();
-            Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                    dataset.getDatasetName(), dataset.getDatasetName());
-            if (primaryIndex != null && (dataset.getDatasetType() != DatasetType.EXTERNAL)) {
-                isSecondary = !indexName.equals(primaryIndex.getIndexName());
-            }
-            int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
-            RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
-            int[] bloomFilterKeyFields;
-            ITypeTraits[] typeTraits;
-            IBinaryComparatorFactory[] comparatorFactories;
-
-            ARecordType itemType = (ARecordType) this.findType(dataset.getItemTypeDataverseName(),
-                    dataset.getItemTypeName());
-            ARecordType metaType = null;
-            List<Integer> primaryKeyIndicators = null;
-            if (dataset.hasMetaPart()) {
-                metaType = (ARecordType) findType(dataset.getMetaItemTypeDataverseName(),
-                        dataset.getMetaItemTypeName());
-                primaryKeyIndicators = ((InternalDatasetDetails) dataset.getDatasetDetails()).getKeySourceIndicator();
-            }
-
-            ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
-            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
-                    itemType, context.getBinaryComparatorFactoryProvider());
-            int[] filterFields = null;
-            int[] btreeFields = null;
-
-            if (isSecondary) {
-                Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                        dataset.getDatasetName(), indexName);
-                numSecondaryKeys = secondaryIndex.getKeyFieldNames().size();
-                bloomFilterKeyFields = new int[numSecondaryKeys];
-                for (int i = 0; i < numSecondaryKeys; i++) {
-                    bloomFilterKeyFields[i] = i;
-                }
-                Pair<IBinaryComparatorFactory[], ITypeTraits[]> comparatorFactoriesAndTypeTraits =
-                        getComparatorFactoriesAndTypeTraitsOfSecondaryBTreeIndex(
-                                secondaryIndex.getKeyFieldNames(), secondaryIndex.getKeyFieldTypes(),
-                                DatasetUtils.getPartitioningKeys(dataset), itemType, dataset.getDatasetType(),
-                                dataset.hasMetaPart(), primaryKeyIndicators,
-                                secondaryIndex.getKeyFieldSourceIndicators(),
-                                metaType);
-                comparatorFactories = comparatorFactoriesAndTypeTraits.first;
-                typeTraits = comparatorFactoriesAndTypeTraits.second;
-                if (filterTypeTraits != null) {
-                    filterFields = new int[1];
-                    filterFields[0] = numSecondaryKeys + numPrimaryKeys;
-                    btreeFields = new int[numSecondaryKeys + numPrimaryKeys];
-                    for (int k = 0; k < btreeFields.length; k++) {
-                        btreeFields[k] = k;
-                    }
-                }
-
-            } else {
-                bloomFilterKeyFields = new int[numPrimaryKeys];
-                for (int i = 0; i < numPrimaryKeys; i++) {
-                    bloomFilterKeyFields[i] = i;
-                }
-                // get meta item type
-                ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
-                typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType);
-                comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset, itemType, metaItemType,
-                        context.getBinaryComparatorFactoryProvider());
-                filterFields = DatasetUtils.createFilterFields(dataset);
-                btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
-            }
-
-            IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
-            spPc = splitProviderAndPartitionConstraintsForDataset(dataset.getDataverseName(), dataset.getDatasetName(),
-                    indexName, temp);
-
-            ISearchOperationCallbackFactory searchCallbackFactory;
-            if (isSecondary) {
-                searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
-                        : new SecondaryIndexSearchOperationCallbackFactory();
-            } else {
-                JobId jobId = ((JobEventListenerFactory) jobSpec.getJobletEventListenerFactory()).getJobId();
-                int datasetId = dataset.getDatasetId();
-                int[] primaryKeyFields = new int[numPrimaryKeys];
-                for (int i = 0; i < numPrimaryKeys; i++) {
-                    primaryKeyFields[i] = i;
-                }
-
-                ITransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
-
-                /**
-                 * Due to the read-committed isolation level,
-                 * we may acquire very short duration lock(i.e., instant lock) for readers.
-                 */
-                searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
-                        : new PrimaryIndexInstantSearchOperationCallbackFactory(jobId, datasetId, primaryKeyFields,
-                                txnSubsystemProvider, ResourceType.LSM_BTREE);
-            }
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
-                    .getMergePolicyFactory(dataset, mdTxnCtx);
-            AsterixRuntimeComponentsProvider rtcProvider = AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
-            BTreeSearchOperatorDescriptor btreeSearchOp;
-            if (dataset.getDatasetType() == DatasetType.INTERNAL) {
-                btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc,
-                        appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
-                        spPc.first, typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields, highKeyFields,
-                        lowKeyInclusive, highKeyInclusive,
-                        new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
-                                compactionInfo.first, compactionInfo.second,
-                                isSecondary ? new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId())
-                                        : new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                                rtcProvider, LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                                storageProperties.getBloomFilterFalsePositiveRate(), !isSecondary, filterTypeTraits,
-                                filterCmpFactories, btreeFields, filterFields, !temp),
-                        retainInput, retainMissing, context.getMissingWriterFactory(), searchCallbackFactory,
-                        minFilterFieldIndexes, maxFilterFieldIndexes);
-            } else {
-                // External dataset <- use the btree with buddy btree->
-                // Be Careful of Key Start Index ?
-                int[] buddyBreeFields = new int[] { numSecondaryKeys };
-                ExternalBTreeWithBuddyDataflowHelperFactory indexDataflowHelperFactory =
-                        new ExternalBTreeWithBuddyDataflowHelperFactory(
-                                compactionInfo.first, compactionInfo.second,
-                                new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                                LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE,
-                                getStorageProperties().getBloomFilterFalsePositiveRate(), buddyBreeFields,
-                                ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this), !temp);
-                btreeSearchOp = new ExternalBTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, rtcProvider,
-                        rtcProvider, spPc.first, typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields,
-                        highKeyFields, lowKeyInclusive, highKeyInclusive, indexDataflowHelperFactory, retainInput,
-                        retainMissing, context.getMissingWriterFactory(), searchCallbackFactory);
-            }
-            return new Pair<>(btreeSearchOp, spPc.second);
-        } catch (MetadataException me) {
-            throw new AlgebricksException(me);
-        }
-    }
-
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildRtreeRuntime(JobSpecification jobSpec,
-            List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
-            JobGenContext context, boolean retainInput, boolean retainMissing, Dataset dataset, String indexName,
-            int[] keyFields, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes) throws AlgebricksException {
-        try {
-            ARecordType recType = (ARecordType) findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
-            int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
-
-            boolean temp = dataset.getDatasetDetails().isTemp();
-            Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                    dataset.getDatasetName(), indexName);
-            if (secondaryIndex == null) {
-                throw new AlgebricksException(
-                        "Code generation error: no index " + indexName + " for dataset " + dataset.getDatasetName());
-            }
-            List<List<String>> secondaryKeyFields = secondaryIndex.getKeyFieldNames();
-            List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes();
-            int numSecondaryKeys = secondaryKeyFields.size();
-            if (numSecondaryKeys != 1) {
-                throw new AlgebricksException(
-                        "Cannot use " + numSecondaryKeys + " fields as a key for the R-tree index. "
-                                + "There can be only one field as a key for the R-tree index.");
-            }
-            Pair<IAType, Boolean> keyTypePair = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0),
-                    secondaryKeyFields.get(0), recType);
-            IAType keyType = keyTypePair.first;
-            if (keyType == null) {
-                throw new AlgebricksException("Could not find field " + secondaryKeyFields.get(0) + " in the schema.");
-            }
-            int numDimensions = NonTaggedFormatUtil.getNumDimensions(keyType.getTypeTag());
-            boolean isPointMBR = keyType.getTypeTag() == ATypeTag.POINT || keyType.getTypeTag() == ATypeTag.POINT3D;
-            int numNestedSecondaryKeyFields = numDimensions * 2;
-            IPrimitiveValueProviderFactory[] valueProviderFactories =
-                    new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
-            for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
-                valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
-            }
-
-            RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
-            // IS NOT THE VARIABLE BELOW ALWAYS = 0 ??
-            int keysStartIndex = outputRecDesc.getFieldCount() - numNestedSecondaryKeyFields - numPrimaryKeys;
-            if (retainInput) {
-                keysStartIndex -= numNestedSecondaryKeyFields;
-            }
-            IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories(
-                    outputVars, keysStartIndex, numNestedSecondaryKeyFields, typeEnv, context);
-            ITypeTraits[] typeTraits = JobGenHelper.variablesToTypeTraits(outputVars, keysStartIndex,
-                    numNestedSecondaryKeyFields + numPrimaryKeys, typeEnv, context);
-            IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc =
-                    splitProviderAndPartitionConstraintsForDataset(dataset.getDataverseName(),
-                            dataset.getDatasetName(), indexName, temp);
-            ARecordType metaType = null;
-            if (dataset.hasMetaPart()) {
-                metaType = (ARecordType) findType(dataset.getMetaItemTypeDataverseName(),
-                        dataset.getMetaItemTypeName());
-            }
-
-            IBinaryComparatorFactory[] primaryComparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
-                    dataset, recType, metaType, context.getBinaryComparatorFactoryProvider());
-            int[] btreeFields = new int[primaryComparatorFactories.length];
-            for (int i = 0; i < btreeFields.length; i++) {
-                btreeFields[i] = i + numNestedSecondaryKeyFields;
-            }
-
-            ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
-            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
-                    recType, context.getBinaryComparatorFactoryProvider());
-            int[] filterFields = null;
-            int[] rtreeFields = null;
-            if (filterTypeTraits != null) {
-                filterFields = new int[1];
-                filterFields[0] = numNestedSecondaryKeyFields + numPrimaryKeys;
-                rtreeFields = new int[numNestedSecondaryKeyFields + numPrimaryKeys];
-                for (int i = 0; i < rtreeFields.length; i++) {
-                    rtreeFields[i] = i;
-                }
-            }
-
-            IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(keyType.getTypeTag());
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
-                    .getMergePolicyFactory(dataset, mdTxnCtx);
-            ISearchOperationCallbackFactory searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
-                    : new SecondaryIndexSearchOperationCallbackFactory();
-
-            RTreeSearchOperatorDescriptor rtreeSearchOp;
-            if (dataset.getDatasetType() == DatasetType.INTERNAL) {
-                IBinaryComparatorFactory[] deletedKeyBTreeCompFactories = getMergedComparatorFactories(
-                        comparatorFactories, primaryComparatorFactories);
-                IIndexDataflowHelperFactory idff = new LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(
-                        valueProviderFactories, RTreePolicyType.RTREE, deletedKeyBTreeCompFactories,
-                        new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
-                        compactionInfo.second, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMRTreeIOOperationCallbackFactory.INSTANCE,
-                        AqlMetadataProvider.proposeLinearizer(nestedKeyType.getTypeTag(), comparatorFactories.length),
-                        rtreeFields, filterTypeTraits, filterCmpFactories, filterFields, !temp, isPointMBR);
-                rtreeSearchOp = new RTreeSearchOperatorDescriptor(jobSpec, outputRecDesc,
-                        appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
-                        spPc.first, typeTraits, comparatorFactories, keyFields, idff, retainInput, retainMissing,
-                        context.getMissingWriterFactory(), searchCallbackFactory, minFilterFieldIndexes,
-                        maxFilterFieldIndexes);
-            } else {
-                // External Dataset
-                ExternalRTreeDataflowHelperFactory indexDataflowHelperFactory = new ExternalRTreeDataflowHelperFactory(
-                        valueProviderFactories, RTreePolicyType.RTREE,
-                        IndexingConstants.getBuddyBtreeComparatorFactories(), compactionInfo.first,
-                        compactionInfo.second, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMRTreeIOOperationCallbackFactory.INSTANCE,
-                        proposeLinearizer(nestedKeyType.getTypeTag(), comparatorFactories.length),
-                        getStorageProperties().getBloomFilterFalsePositiveRate(),
-                        new int[] { numNestedSecondaryKeyFields },
-                        ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this), !temp, isPointMBR);
-                // Create the operator
-                rtreeSearchOp = new ExternalRTreeSearchOperatorDescriptor(jobSpec, outputRecDesc,
-                        appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
-                        spPc.first, typeTraits, comparatorFactories, keyFields, indexDataflowHelperFactory, retainInput,
-                        retainMissing, context.getMissingWriterFactory(), searchCallbackFactory);
-            }
-
-            return new Pair<>(rtreeSearchOp, spPc.second);
-        } catch (MetadataException me) {
-            throw new AlgebricksException(me);
-        }
-    }
-
-    @Override
-    public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink,
-            int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc) {
-        FileSplitDataSink fsds = (FileSplitDataSink) sink;
-        FileSplitSinkId fssi = fsds.getId();
-        FileSplit fs = fssi.getFileSplit();
-        File outFile = fs.getLocalFile().getFile();
-        String nodeId = fs.getNodeName();
-
-        SinkWriterRuntimeFactory runtime = new SinkWriterRuntimeFactory(printColumns, printerFactories, outFile,
-                getWriterFactory(), inputDesc);
-        AlgebricksPartitionConstraint apc = new AlgebricksAbsolutePartitionConstraint(new String[] { nodeId });
-        return new Pair<>(runtime, apc);
-    }
-
-    @Override
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
-            int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, boolean ordered,
-            JobSpecification spec) throws AlgebricksException {
-        ResultSetDataSink rsds = (ResultSetDataSink) sink;
-        ResultSetSinkId rssId = rsds.getId();
-        ResultSetId rsId = rssId.getResultSetId();
-        ResultWriterOperatorDescriptor resultWriter = null;
-        try {
-            IResultSerializerFactory resultSerializedAppenderFactory = resultSerializerFactoryProvider
-                    .getAqlResultSerializerFactoryProvider(printColumns, printerFactories, getWriterFactory());
-            resultWriter = new ResultWriterOperatorDescriptor(spec, rsId, ordered, getResultAsyncMode(),
-                    resultSerializedAppenderFactory);
-        } catch (IOException e) {
-            throw new AlgebricksException(e);
-        }
-        return new Pair<>(resultWriter, null);
-    }
-
-    @Override
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(
-            IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
-            LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields, JobGenContext context,
-            JobSpecification spec) throws AlgebricksException {
-        String dataverseName = dataSource.getId().getDataverseName();
-        String datasetName = dataSource.getId().getDatasourceName();
-
-        Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
-        int numKeys = keys.size();
-        int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
-
-        // move key fields to front
-        int[] fieldPermutation = new int[numKeys + 1 + numFilterFields];
-        int[] bloomFilterKeyFields = new int[numKeys];
-        int i = 0;
-        for (LogicalVariable varKey : keys) {
-            int idx = propagatedSchema.findVariable(varKey);
-            fieldPermutation[i] = idx;
-            bloomFilterKeyFields[i] = i;
-            i++;
-        }
-        fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
-        if (numFilterFields > 0) {
-            int idx = propagatedSchema.findVariable(additionalNonKeyFields.get(0));
-            fieldPermutation[numKeys + 1] = idx;
-        }
-
-        try {
-            boolean temp = dataset.getDatasetDetails().isTemp();
-            isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
-
-            Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                    dataset.getDatasetName(), dataset.getDatasetName());
-            String indexName = primaryIndex.getIndexName();
-
-            ARecordType metaType = null;
-            if (dataset.hasMetaPart()) {
-                metaType = (ARecordType) findType(dataset.getMetaItemTypeDataverseName(),
-                        dataset.getMetaItemTypeName());
-            }
-
-            String itemTypeName = dataset.getItemTypeName();
-            ARecordType itemType = (ARecordType) MetadataManager.INSTANCE
-                    .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName).getDatatype();
-            ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, null);
-            IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
-                    itemType, metaType, context.getBinaryComparatorFactoryProvider());
-
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    splitProviderAndPartitionConstraintsForDataset(dataSource.getId().getDataverseName(),
-                            datasetName, indexName, temp);
-            IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
-
-            long numElementsHint = getCardinalityPerPartitionHint(dataset);
-
-            ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
-            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
-                    itemType, context.getBinaryComparatorFactoryProvider());
-            int[] filterFields = DatasetUtils.createFilterFields(dataset);
-            int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
-
-            // TODO
-            // figure out the right behavior of the bulkload and then give the
-            // right callback
-            // (ex. what's the expected behavior when there is an error during
-            // bulkload?)
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
-                    .getMergePolicyFactory(dataset, mdTxnCtx);
-            TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec, null,
-                    appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
-                    splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields, fieldPermutation,
-                    GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, true,
-                    new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
-                            compactionInfo.first, compactionInfo.second,
-                            new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                            storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits,
-                            filterCmpFactories, btreeFields, filterFields, !temp));
-            return new Pair<>(btreeBulkLoad, splitsAndConstraint.second);
-        } catch (MetadataException me) {
-            throw new AlgebricksException(me);
-        }
-    }
-
-    @Override
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(
-            IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
-            List<LogicalVariable> keys, LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields,
-            List<LogicalVariable> additionalNonFilteringFields, RecordDescriptor recordDesc, JobGenContext context,
-            JobSpecification spec, boolean bulkload) throws AlgebricksException {
-        return getInsertOrDeleteRuntime(IndexOperation.INSERT, dataSource, propagatedSchema, keys, payload,
-                additionalNonKeyFields, recordDesc, context, spec, bulkload, additionalNonFilteringFields);
-    }
-
-    @Override
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getDeleteRuntime(
-            IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
-            List<LogicalVariable> keys, LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields,
-            RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec) throws AlgebricksException {
-        return getInsertOrDeleteRuntime(IndexOperation.DELETE, dataSource, propagatedSchema, keys, payload,
-                additionalNonKeyFields, recordDesc, context, spec, false, null);
-    }
-
-    @Override
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(
-            IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
-            IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
-            List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
-            ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec,
-            boolean bulkload) throws AlgebricksException {
-        return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.INSERT, dataSourceIndex, propagatedSchema,
-                inputSchemas, typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, filterExpr, recordDesc,
-                context, spec, bulkload, null, null);
-    }
-
-    @Override
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexDeleteRuntime(
-            IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
-            IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
-            List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
-            ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
-            throws AlgebricksException {
-        return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.DELETE, dataSourceIndex, propagatedSchema,
-                inputSchemas, typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, filterExpr, recordDesc,
-                context, spec, false, null, null);
-    }
-
-    @Override
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexUpsertRuntime(
-            IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
-            IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
-            List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalFilteringKeys,
-            ILogicalExpression filterExpr, List<LogicalVariable> prevSecondaryKeys,
-            LogicalVariable prevAdditionalFilteringKey, RecordDescriptor recordDesc, JobGenContext context,
-            JobSpecification spec) throws AlgebricksException {
-        return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.UPSERT, dataSourceIndex, propagatedSchema,
-                inputSchemas, typeEnv, primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, recordDesc,
-                context, spec, false, prevSecondaryKeys, prevAdditionalFilteringKey);
-    }
-
-    @Override
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getTokenizerRuntime(
-            IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
-            IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
-            List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
-            JobGenContext context, JobSpecification spec, boolean bulkload) throws AlgebricksException {
-
-        String indexName = dataSourceIndex.getId();
-        String dataverseName = dataSourceIndex.getDataSource().getId().getDataverseName();
-        String datasetName = dataSourceIndex.getDataSource().getId().getDatasourceName();
-
-        IOperatorSchema inputSchema;
-        if (inputSchemas.length > 0) {
-            inputSchema = inputSchemas[0];
-        } else {
-            throw new AlgebricksException("TokenizeOperator can not operate without any input variable.");
-        }
-
-        Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
-        Index secondaryIndex;
-        try {
-            secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                    dataset.getDatasetName(), indexName);
-        } catch (MetadataException e) {
-            throw new AlgebricksException(e);
-        }
-        // TokenizeOperator only supports a keyword or n-gram index.
-        switch (secondaryIndex.getIndexType()) {
-            case SINGLE_PARTITION_WORD_INVIX:
-            case SINGLE_PARTITION_NGRAM_INVIX:
-            case LENGTH_PARTITIONED_WORD_INVIX:
-            case LENGTH_PARTITIONED_NGRAM_INVIX:
-                return getBinaryTokenizerRuntime(dataverseName, datasetName, indexName, inputSchema, propagatedSchema,
-                        primaryKeys, secondaryKeys, recordDesc, spec, secondaryIndex.getIndexType());
-            default:
-                throw new AlgebricksException("Currently, we do not support TokenizeOperator for the index type: "
-                        + secondaryIndex.getIndexType());
-        }
-    }
-
-    /**
-     * Calculate an estimate size of the bloom filter. Note that this is an
-     * estimation which assumes that the data is going to be uniformly
-     * distributed across all partitions.
-     *
-     * @param dataset
-     * @return Number of elements that will be used to create a bloom filter per
-     *         dataset per partition
-     * @throws MetadataException
-     * @throws AlgebricksException
-     */
-    public long getCardinalityPerPartitionHint(Dataset dataset) throws MetadataException, AlgebricksException {
-        String numElementsHintString = dataset.getHints().get(DatasetCardinalityHint.NAME);
-        long numElementsHint;
-        if (numElementsHintString == null) {
-            numElementsHint = DatasetCardinalityHint.DEFAULT;
-        } else {
-            numElementsHint = Long.parseLong(numElementsHintString);
-        }
-        int numPartitions = 0;
-        List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName())
-                .getNodeNames();
-        for (String nd : nodeGroup) {
-            numPartitions += ClusterStateManager.INSTANCE.getNodePartitionsCount(nd);
-        }
-        numElementsHint = numElementsHint / numPartitions;
-        return numElementsHint;
-    }
-
-    protected IAdapterFactory getConfiguredAdapterFactory(Dataset dataset, String adapterName,
-            Map<String, String> configuration, ARecordType itemType, boolean isPKAutoGenerated,
-            List<List<String>> primaryKeys, ARecordType metaType) throws AlgebricksException {
-        try {
-            configuration.put(ExternalDataConstants.KEY_DATAVERSE, dataset.getDataverseName());
-            IAdapterFactory adapterFactory = AdapterFactoryProvider.getAdapterFactory(libraryManager, adapterName,
-                    configuration, itemType, metaType);
-
-            // check to see if dataset is indexed
-            Index filesIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                    dataset.getDatasetName(),
-                    dataset.getDatasetName().concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX));
-
-            if (filesIndex != null && filesIndex.getPendingOp() == 0) {
-                // get files
-                List<ExternalFile> files = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset);
-                Iterator<ExternalFile> iterator = files.iterator();
-                while (iterator.hasNext()) {
-                    if (iterator.next().getPendingOp() != ExternalFilePendingOp.PENDING_NO_OP) {
-                        iterator.remove();
-                    }
-                }
-            }
-
-            return adapterFactory;
-        } catch (Exception e) {
-            throw new AlgebricksException("Unable to create adapter", e);
-        }
-    }
-
-    public JobId getJobId() {
-        return jobId;
-    }
-
-    public static ILinearizeComparatorFactory proposeLinearizer(ATypeTag keyType, int numKeyFields)
-            throws AlgebricksException {
-        return AqlLinearizeComparatorFactoryProvider.INSTANCE.getLinearizeComparatorFactory(keyType, true,
-                numKeyFields / 2);
-    }
-
-    public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForDataset(
-            String dataverseName, String datasetName, String targetIdxName, boolean temp) throws AlgebricksException {
-        FileSplit[] splits = splitsForDataset(mdTxnCtx, dataverseName, datasetName, targetIdxName, temp);
-        return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
-    }
-
-    public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForDataverse(
-            String dataverse) {
-        return SplitsAndConstraintsUtil.splitProviderAndPartitionConstraintsForDataverse(dataverse);
-    }
-
-    public FileSplit[] splitsForDataset(MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName,
-            String targetIdxName, boolean temp) throws AlgebricksException {
-        return SplitsAndConstraintsUtil.splitsForDataset(mdTxnCtx, dataverseName, datasetName, targetIdxName, temp);
-    }
-
-    public DatasourceAdapter getAdapter(MetadataTransactionContext mdTxnCtx, String dataverseName, String adapterName)
-            throws MetadataException {
-        DatasourceAdapter adapter;
-        // search in default namespace (built-in adapter)
-        adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME, adapterName);
-
-        // search in dataverse (user-defined adapter)
-        if (adapter == null) {
-            adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, dataverseName, adapterName);
-        }
-        return adapter;
-    }
-
-    public AlgebricksAbsolutePartitionConstraint getClusterLocations() {
-        return ClusterStateManager.INSTANCE.getClusterLocations();
-    }
-
-    public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForFilesIndex(
-            String dataverseName, String datasetName, String targetIdxName, boolean create) throws AlgebricksException {
-        return SplitsAndConstraintsUtil.splitProviderAndPartitionConstraintsForFilesIndex(mdTxnCtx, dataverseName,
-                datasetName, targetIdxName, create);
-    }
-
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataLookupRuntime(
-            JobSpecification jobSpec, Dataset dataset, Index secondaryIndex, int[] ridIndexes, boolean retainInput,
-            IVariableTypeEnvironment typeEnv, List<LogicalVariable> outputVars, IOperatorSchema opSchema,
-            JobGenContext context, AqlMetadataProvider metadataProvider, boolean retainMissing)
-            throws AlgebricksException {
-        try {
-            // Get data type
-            IAType itemType;
-            itemType = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
-                    dataset.getDataverseName(), dataset.getItemTypeName()).getDatatype();
-
-            // Create the adapter factory <- right now there is only one. if there are more in the future, we can create
-            // a map->
-            ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
-            LookupAdapterFactory<?> adapterFactory = AdapterFactoryProvider.getLookupAdapterFactory(libraryManager,
-                    datasetDetails.getProperties(), (ARecordType) itemType, ridIndexes, retainInput, retainMissing,
-                    context.getMissingWriterFactory());
-
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo;
-            try {
-                compactionInfo = DatasetUtils.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
-            } catch (MetadataException e) {
-                throw new AlgebricksException(" Unabel to create merge policy factory for external dataset", e);
-            }
-
-            boolean temp = datasetDetails.isTemp();
-            // Create the file index data flow helper
-            ExternalBTreeDataflowHelperFactory indexDataflowHelperFactory = new ExternalBTreeDataflowHelperFactory(
-                    compactionInfo.first, compactionInfo.second,
-                    new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                    AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                    metadataProvider.getStorageProperties().getBloomFilterFalsePositiveRate(),
-                    ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, metadataProvider), !temp);
-
-            // Create the out record descriptor, appContext and fileSplitProvider for the files index
-            RecordDescriptor outRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
-            IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
-            spPc = metadataProvider.splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(),
-                    dataset.getDatasetName(),
-                    dataset.getDatasetName().concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX), false);
-            ISearchOperationCallbackFactory searchOpCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
-                    : new SecondaryIndexSearchOperationCallbackFactory();
-            // Create the operator
-            ExternalLookupOperatorDescriptor op = new ExternalLookupOperatorDescriptor(jobSpec, adapterFactory,
-                    outRecDesc, indexDataflowHelperFactory, retainInput, appContext.getIndexLifecycleManagerProvider(),
-                    appContext.getStorageManagerInterface(), spPc.first, dataset.getDatasetId(),
-                    metadataProvider.getStorageProperties().getBloomFilterFalsePositiveRate(), searchOpCallbackFactory,
-                    retainMissing, context.getMissingWriterFactory());
-            return new Pair<>(op, spPc.second);
-        } catch (Exception e) {
-            throw new AlgebricksException(e);
-        }
-    }
-
-    @Override
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getUpsertRuntime(
-            IDataSource<AqlSourceId> dataSource, IOperatorSchema inputSchema, IVariableTypeEnvironment typeEnv,
-            List<LogicalVariable> primaryKeys, LogicalVariable payload, List<LogicalVariable> filterKeys,
-            List<LogicalVariable> additionalNonFilterFields, RecordDescriptor recordDesc, JobGenContext context,
-            JobSpecification spec) throws AlgebricksException {
-        String datasetName = dataSource.getId().getDatasourceName();
-        Dataset dataset = findDataset(dataSource.getId().getDataverseName(), datasetName);
-        if (dataset == null) {
-            throw new AlgebricksException(
-                    "Unknown dataset " + datasetName + " in dataverse " + dataSource.getId().getDataverseName());
-        }
-        boolean temp = dataset.getDatasetDetails().isTemp();
-        isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
-
-        int numKeys = primaryKeys.size();
-        int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
-        int numOfAdditionalFields = additionalNonFilterFields == null ? 0 : additionalNonFilterFields.size();
-        // Move key fields to front. [keys, record, filters]
-        int[] fieldPermutation = new int[numKeys + 1 + numFilterFields + numOfAdditionalFields];
-        int[] bloomFilterKeyFields = new int[numKeys];
-        int i = 0;
-        // set the keys' permutations
-        for (LogicalVariable varKey : primaryKeys) {
-            int idx = inputSchema.findVariable(varKey);
-            fieldPermutation[i] = idx;
-            bloomFilterKeyFields[i] = i;
-            i++;
-        }
-        // set the record permutation
-        fieldPermutation[i++] = inputSchema.findVariable(payload);
-        // set the filters' permutations.
-        if (numFilterFields > 0) {
-            int idx = inputSchema.findVariable(filterKeys.get(0));
-            fieldPermutation[i++] = idx;
-        }
-
-        if (additionalNonFilterFields != null) {
-            for (LogicalVariable var : additionalNonFilterFields) {
-                int idx = inputSchema.findVariable(var);
-                fieldPermutation[i++] = idx;
-            }
-        }
-
-        try {
-            Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                    dataset.getDatasetName(), dataset.getDatasetName());
-            String indexName = primaryIndex.getIndexName();
-
-            String itemTypeName = dataset.getItemTypeName();
-            String itemTypeDataverseName = dataset.getItemTypeDataverseName();
-            ARecordType itemType = (ARecordType) MetadataManager.INSTANCE
-                    .getDatatype(mdTxnCtx, itemTypeDataverseName, itemTypeName).getDatatype();
-            ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
-            ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType);
-            IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
-            IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
-                    itemType, metaItemType, context.getBinaryComparatorFactoryProvider());
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    splitProviderAndPartitionConstraintsForDataset(dataSource.getId().getDataverseName(), datasetName,
-                            indexName, temp);
-
-            // prepare callback
-            JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
-            int datasetId = dataset.getDatasetId();
-            int[] primaryKeyFields = new int[numKeys];
-            for (i = 0; i < numKeys; i++) {
-                primaryKeyFields[i] = i;
-            }
-
-            ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
-            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
-                    itemType, context.getBinaryComparatorFactoryProvider());
-            int[] filterFields = DatasetUtils.createFilterFields(dataset);
-            int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
-
-            TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
-            IModificationOperationCallbackFactory modificationCallbackFactory = temp
-                    ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            primaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT, ResourceType.LSM_BTREE)
-                    : new UpsertOperationCallbackFactory(jobId, datasetId, primaryKeyFields, txnSubsystemProvider,
-                            IndexOperation.UPSERT, ResourceType.LSM_BTREE, dataset.hasMetaPart());
-
-            LockThenSearchOperationCallbackFactory searchCallbackFactory = new LockThenSearchOperationCallbackFactory(
-                    jobId, datasetId, primaryKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE);
-
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
-                    .getMergePolicyFactory(dataset, mdTxnCtx);
-            IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory(
-                    new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first, compactionInfo.second,
-                    new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                    AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                    storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories,
-                    btreeFields, filterFields, !temp);
-            AsterixLSMTreeUpsertOperatorDescriptor op;
-
-            ITypeTraits[] outputTypeTraits = new ITypeTraits[recordDesc.getFieldCount()
-                    + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
-            ISerializerDeserializer[] outputSerDes = new ISerializerDeserializer[recordDesc.getFieldCount()
-                    + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
-
-            // add the previous record first
-            int f = 0;
-            outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(itemType);
-            f++;
-            // add the previous meta second
-            if (dataset.hasMetaPart()) {
-                outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(
-                        metaItemType);
-                outputTypeTraits[f] = FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(metaItemType);
-                f++;
-            }
-            // add the previous filter third
-            int fieldIdx = -1;
-            if (numFilterFields > 0) {
-                String filterField = DatasetUtils.getFilterField(dataset).get(0);
-                for (i = 0; i < itemType.getFieldNames().length; i++) {
-                    if (itemType.getFieldNames()[i].equals(filterField)) {
-                        break;
-                    }
-                }
-                fieldIdx = i;
-                outputTypeTraits[f] = FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(itemType
-                        .getFieldTypes()[fieldIdx]);
-                outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider()
-                        .getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
-                f++;
-            }
-            for (int j = 0; j < recordDesc.getFieldCount(); j++) {
-                outputTypeTraits[j + f] = recordDesc.getTypeTraits()[j];
-                outputSerDes[j + f] = recordDesc.getFields()[j];
-            }
-
-            RecordDescriptor outputRecordDesc = new RecordDescriptor(outputSerDes, outputTypeTraits);
-            op = new AsterixLSMTreeUpsertOperatorDescriptor(spec, outputRecordDesc,
-                    appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
-                    splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields, fieldPermutation,
-                    idfh, null, true, indexName, context.getMissingWriterFactory(), modificationCallbackFactory,
-                    searchCallbackFactory, null);
-            op.setType(itemType);
-            op.setFilterIndex(fieldIdx);
-            return new Pair<>(op, splitsAndConstraint.second);
-
-        } catch (MetadataException me) {
-            throw new AlgebricksException(me);
-        }
-    }
-
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDatasetDataScannerRuntime(
-            JobSpecification jobSpec, IAType itemType, IAdapterFactory adapterFactory, IDataFormat format)
-            throws AlgebricksException {
-        if (itemType.getTypeTag() != ATypeTag.RECORD) {
-            throw new AlgebricksException("Can only scan datasets of records.");
-        }
-
-        ISerializerDeserializer<?> payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
-        RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
-
-        ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(jobSpec, scannerDesc,
-                adapterFactory);
-
-        AlgebricksPartitionConstraint constraint;
-        try {
-            constraint = adapterFactory.getPartitionConstraint();
-        } catch (Exception e) {
-            throw new AlgebricksException(e);
-        }
-
-        return new Pair<>(dataScanner, constraint);
-    }
-
-    private Pair<IBinaryComparatorFactory[], ITypeTraits[]> getComparatorFactoriesAndTypeTraitsOfSecondaryBTreeIndex(
-            List<List<String>> sidxKeyFieldNames, List<IAType> sidxKeyFieldTypes, List<List<String>> pidxKeyFieldNames,
-            ARecordType recType, DatasetType dsType, boolean hasMeta, List<Integer> primaryIndexKeyIndicators,
-            List<Integer> secondaryIndexIndicators, ARecordType metaType) throws AlgebricksException {
-
-        IBinaryComparatorFactory[] comparatorFactories;
-        ITypeTraits[] typeTraits;
-        int sidxKeyFieldCount = sidxKeyFieldNames.size();
-        int pidxKeyFieldCount = pidxKeyFieldNames.size();
-        typeTraits = new ITypeTraits[sidxKeyFieldCount + pidxKeyFieldCount];
-        comparatorFactories = new IBinaryComparatorFactory[sidxKeyFieldCount + pidxKeyFieldCount];
-
-        int i = 0;
-        for (; i < sidxKeyFieldCount; ++i) {
-            Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(sidxKeyFieldTypes.get(i),
-                    sidxKeyFieldNames.get(i),
-                    (hasMeta && secondaryIndexIndicators.get(i).intValue() == 1) ? metaType : recType);
-            IAType keyType = keyPairType.first;
-            comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
-                    true);
-            typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-        }
-
-        for (int j = 0; j < pidxKeyFieldCount; ++j, ++i) {
-            IAType keyType = null;
-            try {
-                switch (dsType) {
-                    case INTERNAL:
-                        keyType = (hasMeta && primaryIndexKeyIndicators.get(j).intValue() == 1)
-                                ? metaType.getSubFieldType(pidxKeyFieldNames.get(j))
-                                : recType.getSubFieldType(pidxKeyFieldNames.get(j));
-                        break;
-                    case EXTERNAL:
-                        keyType = IndexingConstants.getFieldType(j);
-                        break;
-                    default:
-                        throw new AlgebricksException("Unknown Dataset Type");
-                }
-            } catch (AsterixException e) {
-                throw new AlgebricksException(e);
-            }
-            comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
-                    true);
-            typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-        }
-
-        return new Pair<>(comparatorFactories, typeTraits);
-    }
-
-    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertOrDeleteRuntime(IndexOperation indexOp,
-            IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
-            LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields, RecordDescriptor recordDesc,
-            JobGenContext context, JobSpecification spec, boolean bulkload,
-            List<LogicalVariable> additionalNonFilteringFields) throws AlgebricksException {
-
-        String datasetName = dataSource.getId().getDatasourceName();
-        Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataSource.getId().getDataverseName(),
-                datasetName);
-        boolean temp = dataset.getDatasetDetails().isTemp();
-        isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
-
-        int numKeys = keys.size();
-        int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
-        // Move key fields to front.
-        int[] fieldPermutation = new int[numKeys + 1 + numFilterFields
-                + (additionalNonFilteringFields == null ? 0 : additionalNonFilteringFields.size())];
-        int[] bloomFilterKeyFields = new int[numKeys];
-        int i = 0;
-        for (LogicalVariable varKey : keys) {
-            int idx = propagatedSchema.findVariable(varKey);
-            fieldPermutation[i] = idx;
-            bloomFilterKeyFields[i] = i;
-            i++;
-        }
-        fieldPermutation[i++] = propagatedSchema.findVariable(payload);
-        if (numFilterFields > 0) {
-            int idx = propagatedSchema.findVariable(additionalNonKeyFields.get(0));
-            fieldPermutation[i++] = idx;
-        }
-        if (additionalNonFilteringFields != null) {
-            for (LogicalVariable variable : additionalNonFilteringFields) {
-                int idx = propagatedSchema.findVariable(variable);
-                fieldPermutation[i++] = idx;
-            }
-        }
-
-        try {
-            Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                    dataset.getDatasetName(), dataset.getDatasetName());
-            String indexName = primaryIndex.getIndexName();
-            ARecordType itemType = (ARecordType) MetadataManager.INSTANCE
-                    .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), dataset.getItemTypeName()).getDatatype();
-            ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
-            ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType);
-
-            IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
-            IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
-                    itemType, metaItemType, context.getBinaryComparatorFactoryProvider());
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    splitProviderAndPartitionConstraintsForDataset(dataSource.getId().getDataverseName(), datasetName,
-                            indexName, temp);
-
-            // prepare callback
-            JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
-            int datasetId = dataset.getDatasetId();
-            int[] primaryKeyFields = new int[numKeys];
-            for (i = 0; i < numKeys; i++) {
-                primaryKeyFields[i] = i;
-            }
-
-            ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
-            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
-                    itemType, context.getBinaryComparatorFactoryProvider());
-            int[] filterFields = DatasetUtils.createFilterFields(dataset);
-            int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
-
-            TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
-            IModificationOperationCallbackFactory modificationCallbackFactory = temp
-                    ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            primaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE)
-                    : new PrimaryIndexModificationOperationCallbackFactory(jobId, datasetId, primaryKeyFields,
-                            txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE, dataset.hasMetaPart());
-
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
-                    .getMergePolicyFactory(dataset, mdTxnCtx);
-            IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory(
-                    new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first, compactionInfo.second,
-                    new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                    AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                    storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories,
-                    btreeFields, filterFields, !temp);
-            IOperatorDescriptor op;
-            if (bulkload) {
-                long numElementsHint = getCardinalityPerPartitionHint(dataset);
-                op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, appContext.getStorageManagerInterface(),
-                        appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
-                        comparatorFactories, bloomFilterKeyFields, fieldPermutation,
-                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR, true, numElementsHint, true, idfh);
-            } else {
-                op = new AsterixLSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc,
-                        appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
-                        splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields,
-                        fieldPermutation, indexOp, idfh, null, true, indexName, null, modificationCallbackFactory,
-                        NoOpOperationCallbackFactory.INSTANCE);
-            }
-            return new Pair<>(op, splitsAndConstraint.second);
-        } catch (MetadataException me) {
-            throw new AlgebricksException(me);
-        }
-    }
-
-    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertOrDeleteOrUpsertRuntime(
-            IndexOperation indexOp, IDataSourceIndex<String, AqlSourceId> dataSourceIndex,
-            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv,
-            List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
-            List<LogicalVariable> additionalNonKeyFields, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
-            JobGenContext context, JobSpecification spec, boolean bulkload, List<LogicalVariable> prevSecondaryKeys,
-            LogicalVariable prevAdditionalFilteringKey) throws AlgebricksException {
-        String indexName = dataSourceIndex.getId();
-        String dataverseName = dataSourceIndex.getDataSource().getId().getDataverseName();
-        String datasetName = dataSourceIndex.getDataSource().getId().getDatasourceName();
-
-        Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
-        Index secondaryIndex;
-        try {
-            secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                    dataset.getDatasetName(), indexName);
-        } catch (MetadataException e) {
-            throw new AlgebricksException(e);
-        }
-
-        ArrayList<LogicalVariable> prevAdditionalFilteringKeys = null;
-        if (indexOp == IndexOperation.UPSERT && prevAdditionalFilteringKey != null) {
-            prevAdditionalFilteringKeys = new ArrayList<>();
-            prevAdditionalFilteringKeys.add(prevAdditionalFilteringKey);
-        }
-        AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(inputSchemas, typeEnv, filterExpr, context);
-        switch (secondaryIndex.getIndexType()) {
-            case BTREE:
-                return getBTreeRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
-                        secondaryKeys, additionalNonKeyFields, filterFactory, recordDesc, context, spec, indexOp,
-                        bulkload, prevSecondaryKeys, prevAdditionalFilteringKeys);
-            case RTREE:
-                return getRTreeRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
-                        secondaryKeys, additionalNonKeyFields, filterFactory, recordDesc, context, spec, indexOp,
-                        bulkload, prevSecondaryKeys, prevAdditionalFilteringKeys);
-            case SINGLE_PARTITION_WORD_INVIX:
-            case SINGLE_PARTITION_NGRAM_INVIX:
-            case LENGTH_PARTITIONED_WORD_INVIX:
-            case LENGTH_PARTITIONED_NGRAM_INVIX:
-                return getInvertedIndexRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
-                        secondaryKeys, additionalNonKeyFields, filterFactory, recordDesc, context, spec, indexOp,
-                        secondaryIndex.getIndexType(), bulkload, prevSecondaryKeys, prevAdditionalFilteringKeys);
-            default:
-                throw new AlgebricksException(
-                        indexOp.name() + "Insert, upsert, and delete not implemented for index type: "
-                                + secondaryIndex.getIndexType());
-        }
-    }
-
-    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBTreeRuntime(String dataverseName,
-            String datasetName, String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys,
-            List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
-            AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc, JobGenContext context,
-            JobSpecification spec, IndexOperation indexOp, boolean bulkload, List<LogicalVariable> prevSecondaryKeys,
-            List<LogicalVariable> prevAdditionalFilteringKeys) throws AlgebricksException {
-        Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
-        boolean temp = dataset.getDatasetDetails().isTemp();
-        isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
-
-        int numKeys = primaryKeys.size() + secondaryKeys.size();
-        int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
-
-        // generate field permutations
-        int[] fieldPermutation = new int[numKeys + numFilterFields];
-        int[] bloomFilterKeyFields = new int[secondaryKeys.size()];
-        int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
-        int i = 0;
-        int j = 0;
-        for (LogicalVariable varKey : secondaryKeys) {
-            int idx = propagatedSchema.findVariable(varKey);
-            fieldPermutation[i] = idx;
-            bloomFilterKeyFields[i] = i;
-            i++;
-        }
-        for (LogicalVariabl

<TRUNCATED>

Mime
View raw message