asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From buyin...@apache.org
Subject [03/16] asterixdb git commit: Add Asterix Extension Manager
Date Sat, 20 Aug 2016 06:15:45 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
index 679379b..da54e32 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
@@ -30,7 +30,6 @@ import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.common.config.GlobalConfig;
-import org.apache.asterix.common.config.MetadataConstants;
 import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.context.TransactionSubsystemProvider;
@@ -50,7 +49,6 @@ import org.apache.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveVal
 import org.apache.asterix.external.adapter.factory.LookupAdapterFactory;
 import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.api.IDataSourceAdapter;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.external.indexing.IndexingConstants;
@@ -58,7 +56,6 @@ import org.apache.asterix.external.operators.ExternalBTreeSearchOperatorDescript
 import org.apache.asterix.external.operators.ExternalDataScanOperatorDescriptor;
 import org.apache.asterix.external.operators.ExternalLookupOperatorDescriptor;
 import org.apache.asterix.external.operators.ExternalRTreeSearchOperatorDescriptor;
-import org.apache.asterix.external.operators.FeedCollectOperatorDescriptor;
 import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor;
 import org.apache.asterix.external.provider.AdapterFactoryProvider;
 import org.apache.asterix.external.util.ExternalDataConstants;
@@ -66,7 +63,6 @@ import org.apache.asterix.external.util.FeedConstants;
 import org.apache.asterix.formats.base.IDataFormat;
 import org.apache.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
 import org.apache.asterix.formats.nontagged.AqlLinearizeComparatorFactoryProvider;
-import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import org.apache.asterix.formats.nontagged.AqlTypeTraitProvider;
 import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.MetadataManager;
@@ -83,10 +79,10 @@ import org.apache.asterix.metadata.entities.FeedPolicyEntity;
 import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails;
 import org.apache.asterix.metadata.entities.NodeGroup;
-import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
 import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
 import org.apache.asterix.metadata.utils.DatasetUtils;
 import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
+import org.apache.asterix.metadata.utils.MetadataConstants;
 import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
 import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
 import org.apache.asterix.om.types.ARecordType;
@@ -97,7 +93,6 @@ import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.asterix.om.util.NonTaggedFormatUtil;
 import org.apache.asterix.runtime.base.AsterixTupleFilterFactory;
 import org.apache.asterix.runtime.formats.FormatUtils;
-import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
 import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
 import org.apache.asterix.runtime.operators.AsterixLSMInvertedIndexUpsertOperatorDescriptor;
 import org.apache.asterix.runtime.operators.AsterixLSMTreeUpsertOperatorDescriptor;
@@ -334,110 +329,31 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
             JobGenContext context, JobSpecification jobSpec, Object implConfig) throws AlgebricksException {
         try {
-            switch (((AqlDataSource) dataSource).getDatasourceType()) {
-                case FEED:
-                    return buildFeedCollectRuntime(jobSpec, (FeedDataSource) dataSource);
-                case INTERNAL_DATASET: {
-                    // querying an internal dataset
-                    return buildInternalDatasetScan(jobSpec, scanVariables, minFilterVars, maxFilterVars, opSchema,
-                            typeEnv, dataSource, context, implConfig);
-                }
-                case EXTERNAL_DATASET: {
-                    // querying an external dataset
-                    Dataset dataset = ((DatasetDataSource) dataSource).getDataset();
-                    String itemTypeName = dataset.getItemTypeName();
-                    IAType itemType = MetadataManager.INSTANCE
-                            .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName).getDatatype();
-
-                    ExternalDatasetDetails edd = (ExternalDatasetDetails) dataset.getDatasetDetails();
-                    IAdapterFactory adapterFactory = getConfiguredAdapterFactory(dataset, edd.getAdapter(),
-                            edd.getProperties(), (ARecordType) itemType, false, null, null);
-                    return buildExternalDatasetDataScannerRuntime(jobSpec, itemType, adapterFactory,
-                            NonTaggedDataFormat.INSTANCE);
-                }
-                case LOADABLE: {
-                    // This is a load into dataset operation
-                    LoadableDataSource alds = (LoadableDataSource) dataSource;
-                    List<List<String>> partitioningKeys = alds.getPartitioningKeys();
-                    boolean isPKAutoGenerated =
-                            ((InternalDatasetDetails) alds.getTargetDataset().getDatasetDetails()).isAutogenerated();
-                    ARecordType itemType = (ARecordType) alds.getLoadedType();
-                    int pkIndex = 0;
-                    IAdapterFactory adapterFactory =
-                            getConfiguredAdapterFactory(alds.getTargetDataset(), alds.getAdapter(),
-                                    alds.getAdapterProperties(), itemType, isPKAutoGenerated, partitioningKeys, null);
-                    RecordDescriptor rDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
-                    return buildLoadableDatasetScan(jobSpec, alds, adapterFactory, rDesc, isPKAutoGenerated,
-                            partitioningKeys, itemType, pkIndex);
-                }
-                default: {
-                    throw new IllegalArgumentException();
-                }
-
-            }
+            return ((AqlDataSource) dataSource).buildDatasourceScanRuntime(this, dataSource, scanVariables,
+                    projectVariables, projectPushed, minFilterVars, maxFilterVars, opSchema, typeEnv, context, jobSpec,
+                    implConfig);
         } catch (AsterixException e) {
             throw new AlgebricksException(e);
         }
     }
 
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedCollectRuntime(JobSpecification jobSpec,
-            FeedDataSource feedDataSource) throws AlgebricksException {
-
-        try {
-            ARecordType feedOutputType = (ARecordType) feedDataSource.getItemType();
-            ISerializerDeserializer payloadSerde =
-                    NonTaggedDataFormat.INSTANCE.getSerdeProvider().getSerializerDeserializer(feedOutputType);
-            IAType metaType = feedDataSource.getMetaItemType();
-            List<IAType> pkTypes = feedDataSource.getPkTypes();
-            ArrayList<ISerializerDeserializer> serdes = new ArrayList<>();
-            serdes.add(payloadSerde);
-            if (metaType != null) {
-                serdes.add(AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(metaType));
-            }
-            if (pkTypes != null) {
-                for (IAType type : pkTypes) {
-                    serdes.add(AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(type));
-                }
-            }
-            RecordDescriptor feedDesc =
-                    new RecordDescriptor(serdes.toArray(new ISerializerDeserializer[serdes.size()]));
-            FeedPolicyEntity feedPolicy =
-                    (FeedPolicyEntity) feedDataSource.getProperties().get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
-            if (feedPolicy == null) {
-                throw new AlgebricksException("Feed not configured with a policy");
-            }
-            feedPolicy.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy.getPolicyName());
-            FeedConnectionId feedConnectionId = new FeedConnectionId(feedDataSource.getId().getDataverseName(),
-                    feedDataSource.getId().getDatasourceName(), feedDataSource.getTargetDataset());
-            FeedCollectOperatorDescriptor feedCollector =
-                    new FeedCollectOperatorDescriptor(jobSpec, feedConnectionId, feedDataSource.getSourceFeedId(),
-                            feedOutputType, feedDesc, feedPolicy.getProperties(), feedDataSource.getLocation());
-
-            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedCollector,
-                    determineLocationConstraint(feedDataSource));
-
-        } catch (Exception e) {
-            throw new AlgebricksException(e);
-        }
-    }
-
-    private AlgebricksAbsolutePartitionConstraint determineLocationConstraint(FeedDataSource feedDataSource)
+    public static AlgebricksAbsolutePartitionConstraint determineLocationConstraint(FeedDataSource feedDataSource)
             throws AsterixException {
         return new AlgebricksAbsolutePartitionConstraint(feedDataSource.getLocations());
     }
 
-    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildLoadableDatasetScan(JobSpecification jobSpec,
-            LoadableDataSource alds, IAdapterFactory adapterFactory, RecordDescriptor rDesc, boolean isPKAutoGenerated,
-            List<List<String>> primaryKeys, ARecordType recType, int pkIndex) throws AlgebricksException {
-        ExternalDataScanOperatorDescriptor dataScanner =
-                new ExternalDataScanOperatorDescriptor(jobSpec, rDesc, adapterFactory);
+    protected Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildLoadableDatasetScan(
+            JobSpecification jobSpec, IAdapterFactory adapterFactory, RecordDescriptor rDesc)
+            throws AlgebricksException {
+        ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(jobSpec, rDesc,
+                adapterFactory);
         AlgebricksPartitionConstraint constraint;
         try {
             constraint = adapterFactory.getPartitionConstraint();
         } catch (Exception e) {
             throw new AlgebricksException(e);
         }
-        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(dataScanner, constraint);
+        return new Pair<>(dataScanner, constraint);
     }
 
     public IDataFormat getDataFormat(String dataverseName) throws AsterixException {
@@ -451,40 +367,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
         return format;
     }
 
-    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildInternalDatasetScan(JobSpecification jobSpec,
-            List<LogicalVariable> outputVars, List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars,
-            IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, IDataSource<AqlSourceId> dataSource,
-            JobGenContext context, Object implConfig) throws AlgebricksException, MetadataException {
-        AqlSourceId asid = dataSource.getId();
-        String dataverseName = asid.getDataverseName();
-        String datasetName = asid.getDatasourceName();
-        Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, datasetName);
-
-        int[] minFilterFieldIndexes = null;
-        if (minFilterVars != null && !minFilterVars.isEmpty()) {
-            minFilterFieldIndexes = new int[minFilterVars.size()];
-            int i = 0;
-            for (LogicalVariable v : minFilterVars) {
-                minFilterFieldIndexes[i] = opSchema.findVariable(v);
-                i++;
-            }
-        }
-        int[] maxFilterFieldIndexes = null;
-        if (maxFilterVars != null && !maxFilterVars.isEmpty()) {
-            maxFilterFieldIndexes = new int[maxFilterVars.size()];
-            int i = 0;
-            for (LogicalVariable v : maxFilterVars) {
-                maxFilterFieldIndexes[i] = opSchema.findVariable(v);
-                i++;
-            }
-        }
-
-        return buildBtreeRuntime(jobSpec, outputVars, opSchema, typeEnv, context, true, false,
-                ((DatasetDataSource) dataSource).getDataset(), primaryIndex.getIndexName(), null, null, true, true,
-                implConfig, minFilterFieldIndexes, maxFilterFieldIndexes);
-    }
-
-    private IAdapterFactory getConfiguredAdapterFactory(Dataset dataset, String adapterName,
+    protected IAdapterFactory getConfiguredAdapterFactory(Dataset dataset, String adapterName,
             Map<String, String> configuration, ARecordType itemType, boolean isPKAutoGenerated,
             List<List<String>> primaryKeys, ARecordType metaType) throws AlgebricksException {
         try {
@@ -493,9 +376,9 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                     configuration, itemType, metaType);
 
             // check to see if dataset is indexed
-            Index filesIndex =
-                    MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), dataset.getDatasetName(),
-                            dataset.getDatasetName().concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX));
+            Index filesIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+                    dataset.getDatasetName(),
+                    dataset.getDatasetName().concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX));
 
             if (filesIndex != null && filesIndex.getPendingOp() == 0) {
                 // get files
@@ -526,8 +409,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
         ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
         RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
 
-        ExternalDataScanOperatorDescriptor dataScanner =
-                new ExternalDataScanOperatorDescriptor(jobSpec, scannerDesc, adapterFactory);
+        ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(jobSpec, scannerDesc,
+                adapterFactory);
 
         AlgebricksPartitionConstraint constraint;
         try {
@@ -542,8 +425,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
     public Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory> buildFeedIntakeRuntime(
             JobSpecification jobSpec, Feed primaryFeed, FeedPolicyAccessor policyAccessor) throws Exception {
         Triple<IAdapterFactory, RecordDescriptor, IDataSourceAdapter.AdapterType> factoryOutput = null;
-        factoryOutput =
-                FeedMetadataUtil.getPrimaryFeedFactoryAndOutput(primaryFeed, policyAccessor, mdTxnCtx, libraryManager);
+        factoryOutput = FeedMetadataUtil.getPrimaryFeedFactoryAndOutput(primaryFeed, policyAccessor, mdTxnCtx,
+                libraryManager);
         ARecordType recordType = FeedMetadataUtil.getOutputType(primaryFeed, primaryFeed.getAdapterConfiguration(),
                 ExternalDataConstants.KEY_TYPE_NAME);
         IAdapterFactory adapterFactory = factoryOutput.first;
@@ -587,18 +470,19 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             ITypeTraits[] typeTraits;
             IBinaryComparatorFactory[] comparatorFactories;
 
-            ARecordType itemType =
-                    (ARecordType) this.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
+            ARecordType itemType = (ARecordType) this.findType(dataset.getItemTypeDataverseName(),
+                    dataset.getItemTypeName());
             ARecordType metaType = null;
             List<Integer> primaryKeyIndicators = null;
             if (dataset.hasMetaPart()) {
-                metaType =
-                        (ARecordType) findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName());
+                metaType = (ARecordType) findType(dataset.getMetaItemTypeDataverseName(),
+                        dataset.getMetaItemTypeName());
                 primaryKeyIndicators = ((InternalDatasetDetails) dataset.getDatasetDetails()).getKeySourceIndicator();
             }
 
             ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
-            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.
+                    computeFilterBinaryComparatorFactories(dataset,
                     itemType, context.getBinaryComparatorFactoryProvider());
             int[] filterFields = null;
             int[] btreeFields = null;
@@ -612,11 +496,11 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                     bloomFilterKeyFields[i] = i;
                 }
                 Pair<IBinaryComparatorFactory[], ITypeTraits[]> comparatorFactoriesAndTypeTraits =
-                        getComparatorFactoriesAndTypeTraitsOfSecondaryBTreeIndex(secondaryIndex.getIndexType(),
-                                secondaryIndex.getKeyFieldNames(), secondaryIndex.getKeyFieldTypes(),
-                                DatasetUtils.getPartitioningKeys(dataset), itemType, dataset.getDatasetType(),
-                                dataset.hasMetaPart(), primaryKeyIndicators,
-                                secondaryIndex.getKeyFieldSourceIndicators(), metaType);
+                        getComparatorFactoriesAndTypeTraitsOfSecondaryBTreeIndex(
+                        secondaryIndex.getIndexType(), secondaryIndex.getKeyFieldNames(),
+                        secondaryIndex.getKeyFieldTypes(), DatasetUtils.getPartitioningKeys(dataset), itemType,
+                        dataset.getDatasetType(), dataset.hasMetaPart(), primaryKeyIndicators,
+                        secondaryIndex.getKeyFieldSourceIndicators(), metaType);
                 comparatorFactories = comparatorFactoriesAndTypeTraits.first;
                 typeTraits = comparatorFactoriesAndTypeTraits.second;
                 if (filterTypeTraits != null) {
@@ -636,8 +520,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                 // get meta item type
                 ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
                 typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType);
-                comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset, itemType, metaItemType,
-                        context.getBinaryComparatorFactoryProvider());
+                comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset, itemType,
+                        metaItemType, context.getBinaryComparatorFactoryProvider());
                 filterFields = DatasetUtils.createFilterFields(dataset);
                 btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
             }
@@ -673,8 +557,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                         : new PrimaryIndexInstantSearchOperationCallbackFactory(jobId, datasetId, primaryKeyFields,
                                 txnSubsystemProvider, ResourceType.LSM_BTREE);
             }
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
-                    DatasetUtils.getMergePolicyFactory(dataset, mdTxnCtx);
+            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
+                    .getMergePolicyFactory(dataset, mdTxnCtx);
             AsterixRuntimeComponentsProvider rtcProvider = AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
             BTreeSearchOperatorDescriptor btreeSearchOp;
             if (dataset.getDatasetType() == DatasetType.INTERNAL) {
@@ -696,12 +580,13 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                 // Be Careful of Key Start Index ?
                 int[] buddyBreeFields = new int[] { numSecondaryKeys };
                 ExternalBTreeWithBuddyDataflowHelperFactory indexDataflowHelperFactory =
-                        new ExternalBTreeWithBuddyDataflowHelperFactory(compactionInfo.first, compactionInfo.second,
-                                new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                                LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE,
-                                getStorageProperties().getBloomFilterFalsePositiveRate(), buddyBreeFields,
-                                ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this), !temp);
+                        new ExternalBTreeWithBuddyDataflowHelperFactory(
+                        compactionInfo.first, compactionInfo.second,
+                        new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                        LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE,
+                        getStorageProperties().getBloomFilterFalsePositiveRate(), buddyBreeFields,
+                        ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this), !temp);
                 btreeSearchOp = new ExternalBTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, rtcProvider,
                         rtcProvider, spPc.first, typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields,
                         highKeyFields, lowKeyInclusive, highKeyInclusive, indexDataflowHelperFactory, retainInput,
@@ -730,12 +615,12 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
 
         int i = 0;
         for (; i < sidxKeyFieldCount; ++i) {
-            Pair<IAType, Boolean> keyPairType =
-                    Index.getNonNullableOpenFieldType(sidxKeyFieldTypes.get(i), sidxKeyFieldNames.get(i),
-                            (hasMeta && secondaryIndexIndicators.get(i).intValue() == 1) ? metaType : recType);
+            Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(sidxKeyFieldTypes.get(i),
+                    sidxKeyFieldNames.get(i),
+                    (hasMeta && secondaryIndexIndicators.get(i).intValue() == 1) ? metaType : recType);
             IAType keyType = keyPairType.first;
-            comparatorFactories[i] =
-                    AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true);
+            comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
+                    true);
             typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
         }
 
@@ -757,8 +642,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             } catch (AsterixException e) {
                 throw new AlgebricksException(e);
             }
-            comparatorFactories[i] =
-                    AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true);
+            comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
+                    true);
             typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
         }
 
@@ -771,7 +656,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             int[] keyFields, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes) throws AlgebricksException {
 
         try {
-            ARecordType recType = (ARecordType) findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
+            ARecordType recType = (ARecordType) findType(dataset.getItemTypeDataverseName(),
+                    dataset.getItemTypeName());
             int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
 
             boolean temp = dataset.getDatasetDetails().isTemp();
@@ -786,10 +672,11 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             int numSecondaryKeys = secondaryKeyFields.size();
             if (numSecondaryKeys != 1) {
                 throw new AlgebricksException("Cannot use " + numSecondaryKeys
-                        + " fields as a key for the R-tree index. There can be only one field as a key for the R-tree index.");
+                        + " fields as a key for the R-tree index. "
+                        + "There can be only one field as a key for the R-tree index.");
             }
-            Pair<IAType, Boolean> keyTypePair =
-                    Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0), secondaryKeyFields.get(0), recType);
+            Pair<IAType, Boolean> keyTypePair = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0),
+                    secondaryKeyFields.get(0), recType);
             IAType keyType = keyTypePair.first;
             if (keyType == null) {
                 throw new AlgebricksException("Could not find field " + secondaryKeyFields.get(0) + " in the schema.");
@@ -815,12 +702,12 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                     numNestedSecondaryKeyFields + numPrimaryKeys, typeEnv, context);
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc =
-                    splitProviderAndPartitionConstraintsForDataset(dataset.getDataverseName(), dataset.getDatasetName(),
-                            indexName, temp);
+                    splitProviderAndPartitionConstraintsForDataset(
+                    dataset.getDataverseName(), dataset.getDatasetName(), indexName, temp);
             ARecordType metaType = null;
             if (dataset.hasMetaPart()) {
-                metaType =
-                        (ARecordType) findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName());
+                metaType = (ARecordType) findType(dataset.getMetaItemTypeDataverseName(),
+                        dataset.getMetaItemTypeName());
             }
 
             IBinaryComparatorFactory[] primaryComparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
@@ -831,7 +718,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             }
 
             ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
-            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.
+                    computeFilterBinaryComparatorFactories(dataset,
                     recType, context.getBinaryComparatorFactoryProvider());
             int[] filterFields = null;
             int[] rtreeFields = null;
@@ -845,15 +733,15 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             }
 
             IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(keyType.getTypeTag());
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
-                    DatasetUtils.getMergePolicyFactory(dataset, mdTxnCtx);
-            ISearchOperationCallbackFactory searchCallbackFactory =
-                    temp ? NoOpOperationCallbackFactory.INSTANCE : new SecondaryIndexSearchOperationCallbackFactory();
+            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
+                    .getMergePolicyFactory(dataset, mdTxnCtx);
+            ISearchOperationCallbackFactory searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
+                    : new SecondaryIndexSearchOperationCallbackFactory();
 
             RTreeSearchOperatorDescriptor rtreeSearchOp;
             if (dataset.getDatasetType() == DatasetType.INTERNAL) {
-                IBinaryComparatorFactory[] deletedKeyBTreeCompFactories =
-                        getMergedComparatorFactories(comparatorFactories, primaryComparatorFactories);
+                IBinaryComparatorFactory[] deletedKeyBTreeCompFactories = getMergedComparatorFactories(
+                        comparatorFactories, primaryComparatorFactories);
                 IIndexDataflowHelperFactory idff = new LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(
                         valueProviderFactories, RTreePolicyType.RTREE, deletedKeyBTreeCompFactories,
                         new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
@@ -915,8 +803,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
         File outFile = fs.getLocalFile().getFile();
         String nodeId = fs.getNodeName();
 
-        SinkWriterRuntimeFactory runtime =
-                new SinkWriterRuntimeFactory(printColumns, printerFactories, outFile, getWriterFactory(), inputDesc);
+        SinkWriterRuntimeFactory runtime = new SinkWriterRuntimeFactory(printColumns, printerFactories, outFile,
+                getWriterFactory(), inputDesc);
         AlgebricksPartitionConstraint apc = new AlgebricksAbsolutePartitionConstraint(new String[] { nodeId });
         return new Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint>(runtime, apc);
     }
@@ -1045,8 +933,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
 
             ARecordType metaType = null;
             if (dataset.hasMetaPart()) {
-                metaType =
-                        (ARecordType) findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName());
+                metaType = (ARecordType) findType(dataset.getMetaItemTypeDataverseName(),
+                        dataset.getMetaItemTypeName());
             }
 
             String itemTypeName = dataset.getItemTypeName();
@@ -1057,14 +945,15 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                     itemType, metaType, context.getBinaryComparatorFactoryProvider());
 
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    splitProviderAndPartitionConstraintsForDataset(dataSource.getId().getDataverseName(), datasetName,
-                            indexName, temp);
+                    splitProviderAndPartitionConstraintsForDataset(
+                    dataSource.getId().getDataverseName(), datasetName, indexName, temp);
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
 
             long numElementsHint = getCardinalityPerPartitionHint(dataset);
 
             ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
-            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.
+                    computeFilterBinaryComparatorFactories(dataset,
                     itemType, context.getBinaryComparatorFactoryProvider());
             int[] filterFields = DatasetUtils.createFilterFields(dataset);
             int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
@@ -1074,8 +963,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             // right callback
             // (ex. what's the expected behavior when there is an error during
             // bulkload?)
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
-                    DatasetUtils.getMergePolicyFactory(dataset, mdTxnCtx);
+            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
+                    .getMergePolicyFactory(dataset, mdTxnCtx);
             TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec, null,
                     appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
                     splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields, fieldPermutation,
@@ -1147,8 +1036,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
                     itemType, metaItemType, context.getBinaryComparatorFactoryProvider());
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    splitProviderAndPartitionConstraintsForDataset(dataSource.getId().getDataverseName(), datasetName,
-                            indexName, temp);
+                    splitProviderAndPartitionConstraintsForDataset(
+                    dataSource.getId().getDataverseName(), datasetName, indexName, temp);
 
             // prepare callback
             JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
@@ -1171,8 +1060,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                     : new PrimaryIndexModificationOperationCallbackFactory(jobId, datasetId, primaryKeyFields,
                             txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE, dataset.hasMetaPart());
 
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
-                    DatasetUtils.getMergePolicyFactory(dataset, mdTxnCtx);
+            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
+                    .getMergePolicyFactory(dataset, mdTxnCtx);
             IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory(
                     new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first, compactionInfo.second,
                     new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
@@ -1243,12 +1132,12 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
         AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(inputSchemas, typeEnv, filterExpr, context);
         switch (secondaryIndex.getIndexType()) {
             case BTREE: {
-                return getBTreeDmlRuntime(dataverseName, datasetName, indexName, propagatedSchema, typeEnv, primaryKeys,
+                return getBTreeDmlRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
                         secondaryKeys, additionalNonKeyFields, filterFactory, recordDesc, context, spec, indexOp,
                         bulkload);
             }
             case RTREE: {
-                return getRTreeDmlRuntime(dataverseName, datasetName, indexName, propagatedSchema, typeEnv, primaryKeys,
+                return getRTreeDmlRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
                         secondaryKeys, additionalNonKeyFields, filterFactory, recordDesc, context, spec, indexOp,
                         bulkload);
             }
@@ -1469,8 +1358,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                     secondaryKeyType.getTypeTag(), indexType, secondaryIndex.getGramLength());
 
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName,
-                            dataset.getDatasetDetails().isTemp());
+                    splitProviderAndPartitionConstraintsForDataset(
+                    dataverseName, datasetName, indexName, dataset.getDatasetDetails().isTemp());
 
             // Generate Output Record format
             ISerializerDeserializer<?>[] tokenKeyPairFields = new ISerializerDeserializer[numTokenKeyPairFields];
@@ -1539,13 +1428,13 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             return null;
         }
         IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
-        IScalarEvaluatorFactory filterEvalFactory =
-                expressionRuntimeProvider.createEvaluatorFactory(filterExpr, typeEnv, inputSchemas, context);
+        IScalarEvaluatorFactory filterEvalFactory = expressionRuntimeProvider.createEvaluatorFactory(filterExpr,
+                typeEnv, inputSchemas, context);
         return new AsterixTupleFilterFactory(filterEvalFactory, context.getBinaryBooleanInspectorFactory());
     }
 
     private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBTreeDmlRuntime(String dataverseName,
-            String datasetName, String indexName, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
+            String datasetName, String indexName, IOperatorSchema propagatedSchema,
             List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
             List<LogicalVariable> additionalNonKeyFields, AsterixTupleFilterFactory filterFactory,
             RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, IndexOperation indexOp,
@@ -1602,7 +1491,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                     dataset.getDatasetName(), indexName);
 
             ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
-            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.
+                    computeFilterBinaryComparatorFactories(dataset,
                     recType, context.getBinaryComparatorFactoryProvider());
             int[] filterFields = null;
             int[] btreeFields = null;
@@ -1620,25 +1510,26 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
             IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
             for (i = 0; i < secondaryKeys.size(); ++i) {
-                Pair<IAType, Boolean> keyPairType =
-                        Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(i), secondaryKeyNames.get(i), recType);
+                Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(i),
+                        secondaryKeyNames.get(i), recType);
                 IAType keyType = keyPairType.first;
-                comparatorFactories[i] =
-                        AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true);
+                comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
+                        true);
                 typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
             }
             List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
             for (List<String> partitioningKey : partitioningKeys) {
                 IAType keyType = recType.getSubFieldType(partitioningKey);
-                comparatorFactories[i] =
-                        AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true);
+                comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
+                        true);
                 typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
                 ++i;
             }
 
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName, temp);
+                    splitProviderAndPartitionConstraintsForDataset(
+                    dataverseName, datasetName, indexName, temp);
 
             // prepare callback
             JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
@@ -1651,8 +1542,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                             modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE,
                             dataset.hasMetaPart());
 
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
-                    DatasetUtils.getMergePolicyFactory(dataset, mdTxnCtx);
+            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
+                    .getMergePolicyFactory(dataset, mdTxnCtx);
             IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory(
                     new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first, compactionInfo.second,
                     new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
@@ -1792,8 +1683,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
 
             IAType secondaryKeyType = null;
 
-            Pair<IAType, Boolean> keyPairType =
-                    Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0), secondaryKeyExprs.get(0), recType);
+            Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0),
+                    secondaryKeyExprs.get(0), recType);
             secondaryKeyType = keyPairType.first;
 
             List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
@@ -1842,7 +1733,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
 
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName, temp);
+                    splitProviderAndPartitionConstraintsForDataset(
+                    dataverseName, datasetName, indexName, temp);
 
             // prepare callback
             JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
@@ -1856,8 +1748,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                             modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
                             ResourceType.LSM_INVERTED_INDEX, dataset.hasMetaPart());
 
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
-                    DatasetUtils.getMergePolicyFactory(dataset, mdTxnCtx);
+            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
+                    .getMergePolicyFactory(dataset, mdTxnCtx);
             IIndexDataflowHelperFactory indexDataFlowFactory;
             if (!isPartitioned) {
                 indexDataFlowFactory = new LSMInvertedIndexDataflowHelperFactory(
@@ -1899,7 +1791,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
     }
 
     private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getRTreeDmlRuntime(String dataverseName,
-            String datasetName, String indexName, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
+            String datasetName, String indexName, IOperatorSchema propagatedSchema,
             List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
             List<LogicalVariable> additionalNonKeyFields, AsterixTupleFilterFactory filterFactory,
             RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, IndexOperation indexOp,
@@ -1921,11 +1813,11 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                     dataset.getDatasetName(), indexName);
             List<List<String>> secondaryKeyExprs = secondaryIndex.getKeyFieldNames();
             List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes();
-            Pair<IAType, Boolean> keyPairType =
-                    Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0), secondaryKeyExprs.get(0), recType);
+            Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0),
+                    secondaryKeyExprs.get(0), recType);
             IAType spatialType = keyPairType.first;
-            boolean isPointMBR =
-                    spatialType.getTypeTag() == ATypeTag.POINT || spatialType.getTypeTag() == ATypeTag.POINT3D;
+            boolean isPointMBR = spatialType.getTypeTag() == ATypeTag.POINT
+                    || spatialType.getTypeTag() == ATypeTag.POINT3D;
             int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
             int numSecondaryKeys = dimension * 2;
             int numPrimaryKeys = primaryKeys.size();
@@ -1960,8 +1852,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             IPrimitiveValueProviderFactory[] valueProviderFactories =
                     new IPrimitiveValueProviderFactory[numSecondaryKeys];
             for (i = 0; i < numSecondaryKeys; i++) {
-                comparatorFactories[i] =
-                        AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(nestedKeyType, true);
+                comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
+                        .getBinaryComparatorFactory(nestedKeyType, true);
                 typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
                 valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
             }
@@ -1976,14 +1868,16 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                     dataset, recType, metaItemType, context.getBinaryComparatorFactoryProvider());
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName, temp);
+                    splitProviderAndPartitionConstraintsForDataset(
+                    dataverseName, datasetName, indexName, temp);
             int[] btreeFields = new int[primaryComparatorFactories.length];
             for (int k = 0; k < btreeFields.length; k++) {
                 btreeFields[k] = k + numSecondaryKeys;
             }
 
             ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
-            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.
+                    computeFilterBinaryComparatorFactories(dataset,
                     recType, context.getBinaryComparatorFactoryProvider());
             int[] filterFields = null;
             int[] rtreeFields = null;
@@ -2002,16 +1896,18 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
             IModificationOperationCallbackFactory modificationCallbackFactory = temp
                     ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_RTREE)
+                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider,
+                            indexOp, ResourceType.LSM_RTREE)
                     : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_RTREE,
+                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider,
+                            indexOp, ResourceType.LSM_RTREE,
                             dataset.hasMetaPart());
 
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
-                    DatasetUtils.getMergePolicyFactory(dataset, mdTxnCtx);
+            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
+                    .getMergePolicyFactory(dataset, mdTxnCtx);
 
-            IBinaryComparatorFactory[] deletedKeyBTreeCompFactories =
-                    getMergedComparatorFactories(comparatorFactories, primaryComparatorFactories);
+            IBinaryComparatorFactory[] deletedKeyBTreeCompFactories = getMergedComparatorFactories(comparatorFactories,
+                    primaryComparatorFactories);
             IIndexDataflowHelperFactory idff = new LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(
                     valueProviderFactories, RTreePolicyType.RTREE, deletedKeyBTreeCompFactories,
                     new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
@@ -2069,8 +1965,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             numElementsHint = Long.parseLong(numElementsHintString);
         }
         int numPartitions = 0;
-        List<String> nodeGroup =
-                MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName()).getNodeNames();
+        List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName())
+                .getNodeNames();
         for (String nd : nodeGroup) {
             numPartitions += AsterixClusterProperties.INSTANCE.getNodePartitionsCount(nd);
         }
@@ -2112,8 +2008,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
     }
 
     public Dataset findDataset(String dataverse, String dataset) throws AlgebricksException {
-        String dv =
-                dataverse == null ? (defaultDataverse == null ? null : defaultDataverse.getDataverseName()) : dataverse;
+        String dv = dataverse == null ? (defaultDataverse == null ? null : defaultDataverse.getDataverseName())
+                : dataverse;
         if (dv == null) {
             return null;
         }
@@ -2244,8 +2140,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             spPc = metadataProvider.splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(),
                     dataset.getDatasetName(),
                     dataset.getDatasetName().concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX), false);
-            ISearchOperationCallbackFactory searchOpCallbackFactory =
-                    temp ? NoOpOperationCallbackFactory.INSTANCE : new SecondaryIndexSearchOperationCallbackFactory();
+            ISearchOperationCallbackFactory searchOpCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
+                    : new SecondaryIndexSearchOperationCallbackFactory();
             // Create the operator
             ExternalLookupOperatorDescriptor op = new ExternalLookupOperatorDescriptor(jobSpec, adapterFactory,
                     outRecDesc, indexDataflowHelperFactory, retainInput, appContext.getIndexLifecycleManagerProvider(),
@@ -2319,8 +2215,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
                     itemType, metaItemType, context.getBinaryComparatorFactoryProvider());
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    splitProviderAndPartitionConstraintsForDataset(dataSource.getId().getDataverseName(), datasetName,
-                            indexName, temp);
+                    splitProviderAndPartitionConstraintsForDataset(
+                    dataSource.getId().getDataverseName(), datasetName, indexName, temp);
 
             // prepare callback
             JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
@@ -2331,7 +2227,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             }
 
             ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
-            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.
+                    computeFilterBinaryComparatorFactories(dataset,
                     itemType, context.getBinaryComparatorFactoryProvider());
             int[] filterFields = DatasetUtils.createFilterFields(dataset);
             int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
@@ -2346,8 +2243,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             LockThenSearchOperationCallbackFactory searchCallbackFactory = new LockThenSearchOperationCallbackFactory(
                     jobId, datasetId, primaryKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE);
 
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
-                    DatasetUtils.getMergePolicyFactory(dataset, mdTxnCtx);
+            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
+                    .getMergePolicyFactory(dataset, mdTxnCtx);
             IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory(
                     new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first, compactionInfo.second,
                     new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
@@ -2356,24 +2253,24 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                     btreeFields, filterFields, !temp);
             AsterixLSMTreeUpsertOperatorDescriptor op;
 
-            ITypeTraits[] outputTypeTraits =
-                    new ITypeTraits[recordDesc.getFieldCount() + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
+            ITypeTraits[] outputTypeTraits = new ITypeTraits[recordDesc.getFieldCount()
+                    + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
             ISerializerDeserializer[] outputSerDes = new ISerializerDeserializer[recordDesc.getFieldCount()
                     + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
             for (int j = 0; j < recordDesc.getFieldCount(); j++) {
                 outputTypeTraits[j] = recordDesc.getTypeTraits()[j];
                 outputSerDes[j] = recordDesc.getFields()[j];
             }
-            outputSerDes[outputSerDes.length - (dataset.hasMetaPart() ? 2 : 1) - numFilterFields] =
-                    FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(itemType);
-            outputTypeTraits[outputTypeTraits.length - (dataset.hasMetaPart() ? 2 : 1) - numFilterFields] =
-                    FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(itemType);
+            outputSerDes[outputSerDes.length - (dataset.hasMetaPart() ? 2 : 1) - numFilterFields] = FormatUtils
+                    .getDefaultFormat().getSerdeProvider().getSerializerDeserializer(itemType);
+            outputTypeTraits[outputTypeTraits.length - (dataset.hasMetaPart() ? 2 : 1) - numFilterFields] = FormatUtils
+                    .getDefaultFormat().getTypeTraitProvider().getTypeTrait(itemType);
 
             if (dataset.hasMetaPart()) {
-                outputSerDes[outputSerDes.length - 1 - numFilterFields] =
-                        FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(metaItemType);
-                outputTypeTraits[outputTypeTraits.length - 1 - numFilterFields] =
-                        FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(metaItemType);
+                outputSerDes[outputSerDes.length - 1 - numFilterFields] = FormatUtils.getDefaultFormat()
+                        .getSerdeProvider().getSerializerDeserializer(metaItemType);
+                outputTypeTraits[outputTypeTraits.length - 1 - numFilterFields] = FormatUtils.getDefaultFormat()
+                        .getTypeTraitProvider().getTypeTrait(metaItemType);
             }
 
             int fieldIdx = -1;
@@ -2399,7 +2296,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                     searchCallbackFactory, null);
             op.setType(itemType);
             op.setFilterIndex(fieldIdx);
-            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
+            return new Pair<>(op, splitsAndConstraint.second);
 
         } catch (MetadataException me) {
             throw new AlgebricksException(me);
@@ -2433,7 +2330,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
         AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(inputSchemas, typeEnv, filterExpr, context);
         ArrayList<LogicalVariable> prevAdditionalFilteringKeys = null;
         if (prevAdditionalFilteringKey != null) {
-            prevAdditionalFilteringKeys = new ArrayList<LogicalVariable>();
+            prevAdditionalFilteringKeys = new ArrayList<>();
             prevAdditionalFilteringKeys.add(prevAdditionalFilteringKey);
         }
         switch (secondaryIndex.getIndexType()) {
@@ -2451,7 +2348,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             case SINGLE_PARTITION_NGRAM_INVIX:
             case LENGTH_PARTITIONED_WORD_INVIX:
             case LENGTH_PARTITIONED_NGRAM_INVIX: {
-                return getInvertedIndexUpsertRuntime(dataverseName, datasetName, indexName, propagatedSchema, typeEnv,
+                return getInvertedIndexUpsertRuntime(dataverseName, datasetName, indexName, propagatedSchema,
                         primaryKeys, secondaryKeys, additionalFilteringKeys, filterFactory, recordDesc, context, spec,
                         secondaryIndex.getIndexType(), prevSecondaryKeys, prevAdditionalFilteringKeys);
             }
@@ -2464,7 +2361,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
 
     //TODO: refactor this method
     private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInvertedIndexUpsertRuntime(String dataverseName,
-            String datasetName, String indexName, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
+            String datasetName, String indexName, IOperatorSchema propagatedSchema,
             List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
             List<LogicalVariable> additionalFilteringKeys, AsterixTupleFilterFactory filterFactory,
             RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, IndexType indexType,
@@ -2590,8 +2487,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
 
             IAType secondaryKeyType = null;
 
-            Pair<IAType, Boolean> keyPairType =
-                    Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0), secondaryKeyExprs.get(0), recType);
+            Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0),
+                    secondaryKeyExprs.get(0), recType);
             secondaryKeyType = keyPairType.first;
 
             List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
@@ -2615,7 +2512,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                     secondaryKeyType.getTypeTag(), indexType, secondaryIndex.getGramLength());
 
             ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
-            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.
+                    computeFilterBinaryComparatorFactories(dataset,
                     recType, context.getBinaryComparatorFactoryProvider());
 
             int[] filterFields = null;
@@ -2640,7 +2538,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
 
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName, temp);
+                    splitProviderAndPartitionConstraintsForDataset(
+                    dataverseName, datasetName, indexName, temp);
 
             // prepare callback
             JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
@@ -2654,8 +2553,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                             modificationCallbackPrimaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT,
                             ResourceType.LSM_INVERTED_INDEX, dataset.hasMetaPart());
 
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
-                    DatasetUtils.getMergePolicyFactory(dataset, mdTxnCtx);
+            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
+                    .getMergePolicyFactory(dataset, mdTxnCtx);
             IIndexDataflowHelperFactory indexDataFlowFactory;
             if (!isPartitioned) {
                 indexDataFlowFactory = new LSMInvertedIndexDataflowHelperFactory(
@@ -2713,12 +2612,12 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
 
             List<List<String>> secondaryKeyExprs = secondaryIndex.getKeyFieldNames();
             List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes();
-            Pair<IAType, Boolean> keyPairType =
-                    Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0), secondaryKeyExprs.get(0), recType);
+            Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0),
+                    secondaryKeyExprs.get(0), recType);
             IAType spatialType = keyPairType.first;
 
-            boolean isPointMBR =
-                    spatialType.getTypeTag() == ATypeTag.POINT || spatialType.getTypeTag() == ATypeTag.POINT3D;
+            boolean isPointMBR = spatialType.getTypeTag() == ATypeTag.POINT
+                    || spatialType.getTypeTag() == ATypeTag.POINT3D;
             int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
             int numSecondaryKeys = dimension * 2;
             int numPrimaryKeys = primaryKeys.size();
@@ -2775,8 +2674,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             IPrimitiveValueProviderFactory[] valueProviderFactories =
                     new IPrimitiveValueProviderFactory[numSecondaryKeys];
             for (i = 0; i < numSecondaryKeys; i++) {
-                comparatorFactories[i] =
-                        AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(nestedKeyType, true);
+                comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
+                        .getBinaryComparatorFactory(nestedKeyType, true);
                 typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
                 valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
             }
@@ -2792,14 +2691,16 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                     dataset, recType, metaItemType, context.getBinaryComparatorFactoryProvider());
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName, temp);
+                    splitProviderAndPartitionConstraintsForDataset(
+                    dataverseName, datasetName, indexName, temp);
             int[] btreeFields = new int[primaryComparatorFactories.length];
             for (int k = 0; k < btreeFields.length; k++) {
                 btreeFields[k] = k + numSecondaryKeys;
             }
 
             ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
-            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.
+                    computeFilterBinaryComparatorFactories(dataset,
                     recType, context.getBinaryComparatorFactoryProvider());
             int[] filterFields = null;
             int[] rtreeFields = null;
@@ -2824,10 +2725,10 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                             modificationCallbackPrimaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT,
                             ResourceType.LSM_RTREE, dataset.hasMetaPart());
 
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
-                    DatasetUtils.getMergePolicyFactory(dataset, mdTxnCtx);
-            IBinaryComparatorFactory[] deletedKeyBTreeCompFactories =
-                    getMergedComparatorFactories(comparatorFactories, primaryComparatorFactories);
+            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
+                    .getMergePolicyFactory(dataset, mdTxnCtx);
+            IBinaryComparatorFactory[] deletedKeyBTreeCompFactories = getMergedComparatorFactories(comparatorFactories,
+                    primaryComparatorFactories);
             IIndexDataflowHelperFactory idff = new LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(
                     valueProviderFactories, RTreePolicyType.RTREE, deletedKeyBTreeCompFactories,
                     new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
@@ -2926,7 +2827,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                     dataset.getDatasetName(), indexName);
 
             ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
-            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.
+                    computeFilterBinaryComparatorFactories(dataset,
                     recType, context.getBinaryComparatorFactoryProvider());
             int[] filterFields = null;
             int[] btreeFields = null;
@@ -2944,25 +2846,26 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
             IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
             for (i = 0; i < secondaryKeys.size(); ++i) {
-                Pair<IAType, Boolean> keyPairType =
-                        Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(i), secondaryKeyNames.get(i), recType);
+                Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(i),
+                        secondaryKeyNames.get(i), recType);
                 IAType keyType = keyPairType.first;
-                comparatorFactories[i] =
-                        AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true);
+                comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.
+                        getBinaryComparatorFactory(keyType, true);
                 typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
             }
             List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
             for (List<String> partitioningKey : partitioningKeys) {
                 IAType keyType = recType.getSubFieldType(partitioningKey);
-                comparatorFactories[i] =
-                        AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true);
+                comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.
+                        getBinaryComparatorFactory(keyType, true);
                 typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
                 ++i;
             }
 
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName, temp);
+                    splitProviderAndPartitionConstraintsForDataset(
+                    dataverseName, datasetName, indexName, temp);
 
             // prepare callback
             JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
@@ -2976,8 +2879,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                             modificationCallbackPrimaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT,
                             ResourceType.LSM_BTREE, dataset.hasMetaPart());
 
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
-                    DatasetUtils.getMergePolicyFactory(dataset, mdTxnCtx);
+            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
+                    .getMergePolicyFactory(dataset, mdTxnCtx);
             IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory(
                     new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first, compactionInfo.second,
                     new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
@@ -2989,7 +2892,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                     splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields, fieldPermutation,
                     idfh, filterFactory, false, indexName, null, modificationCallbackFactory,
                     NoOpOperationCallbackFactory.INSTANCE, prevFieldPermutation);
-            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
+            return new Pair<>(op, splitsAndConstraint.second);
         } catch (Exception e) {
             throw new AlgebricksException(e);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
index 4148d65..5a601bc 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
@@ -20,14 +20,28 @@ package org.apache.asterix.metadata.declared;
 
 import java.util.List;
 
+import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.metadata.IDatasetDetails;
+import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
+import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails;
 import org.apache.asterix.metadata.utils.KeyFieldTypeUtils;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.job.JobSpecification;
 
 public class DatasetDataSource extends AqlDataSource {
 
@@ -35,7 +49,7 @@ public class DatasetDataSource extends AqlDataSource {
 
     public DatasetDataSource(AqlSourceId id, Dataset dataset, IAType itemType, IAType metaItemType,
             AqlDataSourceType datasourceType, IDatasetDetails datasetDetails, INodeDomain datasetDomain)
-                    throws AlgebricksException {
+            throws AlgebricksException {
         super(id, itemType, metaItemType, datasourceType, datasetDomain);
         this.dataset = dataset;
         switch (dataset.getDatasetType()) {
@@ -57,8 +71,8 @@ public class DatasetDataSource extends AqlDataSource {
         InternalDatasetDetails internalDatasetDetails = (InternalDatasetDetails) datasetDetails;
         ARecordType recordType = (ARecordType) itemType;
         ARecordType metaRecordType = (ARecordType) metaItemType;
-        List<IAType> partitioningKeyTypes = KeyFieldTypeUtils.getPartitioningKeyTypes(internalDatasetDetails,
-                recordType, metaRecordType);
+        List<IAType> partitioningKeyTypes =
+                KeyFieldTypeUtils.getPartitioningKeyTypes(internalDatasetDetails, recordType, metaRecordType);
         int n = partitioningKeyTypes.size();
         schemaTypes = metaItemType == null ? new IAType[n + 1] : new IAType[n + 2];
         for (int keyIndex = 0; keyIndex < n; ++keyIndex) {
@@ -75,4 +89,56 @@ public class DatasetDataSource extends AqlDataSource {
         schemaTypes[0] = itemType;
     }
 
+    @Override
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildDatasourceScanRuntime(
+            AqlMetadataProvider aqlMetadataProvider, IDataSource<AqlSourceId> dataSource,
+            List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
+            List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema,
+            IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig)
+            throws AlgebricksException {
+        switch (dataset.getDatasetType()) {
+            case EXTERNAL:
+                Dataset externalDataset = ((DatasetDataSource) dataSource).getDataset();
+                String itemTypeName = externalDataset.getItemTypeName();
+                IAType itemType = MetadataManager.INSTANCE.getDatatype(aqlMetadataProvider.getMetadataTxnContext(),
+                        externalDataset.getItemTypeDataverseName(), itemTypeName).getDatatype();
+
+                ExternalDatasetDetails edd = (ExternalDatasetDetails) externalDataset.getDatasetDetails();
+                IAdapterFactory adapterFactory = aqlMetadataProvider.getConfiguredAdapterFactory(externalDataset,
+                        edd.getAdapter(), edd.getProperties(), (ARecordType) itemType, false, null, null);
+                return aqlMetadataProvider.buildExternalDatasetDataScannerRuntime(jobSpec, itemType, adapterFactory,
+                        NonTaggedDataFormat.INSTANCE);
+            case INTERNAL:
+                AqlSourceId asid = getId();
+                String dataverseName = asid.getDataverseName();
+                String datasetName = asid.getDatasourceName();
+                Index primaryIndex = MetadataManager.INSTANCE.getIndex(aqlMetadataProvider.getMetadataTxnContext(),
+                        dataverseName, datasetName, datasetName);
+
+                int[] minFilterFieldIndexes = null;
+                if (minFilterVars != null && !minFilterVars.isEmpty()) {
+                    minFilterFieldIndexes = new int[minFilterVars.size()];
+                    int i = 0;
+                    for (LogicalVariable v : minFilterVars) {
+                        minFilterFieldIndexes[i] = opSchema.findVariable(v);
+                        i++;
+                    }
+                }
+                int[] maxFilterFieldIndexes = null;
+                if (maxFilterVars != null && !maxFilterVars.isEmpty()) {
+                    maxFilterFieldIndexes = new int[maxFilterVars.size()];
+                    int i = 0;
+                    for (LogicalVariable v : maxFilterVars) {
+                        maxFilterFieldIndexes[i] = opSchema.findVariable(v);
+                        i++;
+                    }
+                }
+                return aqlMetadataProvider.buildBtreeRuntime(jobSpec, scanVariables, opSchema, typeEnv, context, true,
+                        false, ((DatasetDataSource) dataSource).getDataset(), primaryIndex.getIndexName(), null, null,
+                        true, true, implConfig, minFilterFieldIndexes, maxFilterFieldIndexes);
+            default:
+                throw new AlgebricksException("Unknown datasource type");
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
index 48f33ed..0a81f03 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
@@ -18,20 +18,39 @@
  */
 package org.apache.asterix.metadata.declared;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.external.feed.api.IFeed;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.operators.FeedCollectOperatorDescriptor;
 import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
+import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.metadata.entities.FeedPolicyEntity;
+import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
+import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.JobSpecification;
 
-public class FeedDataSource extends AqlDataSource {
+public class FeedDataSource extends AqlDataSource implements IMutationDataSource {
 
     private final Feed feed;
     private final EntityId sourceFeedId;
@@ -113,6 +132,7 @@ public class FeedDataSource extends AqlDataSource {
         return pkTypes;
     }
 
+    @Override
     public List<ScalarFunctionCallExpression> getKeyAccessExpression() {
         return keyAccessExpression;
     }
@@ -127,10 +147,12 @@ public class FeedDataSource extends AqlDataSource {
         return dataScanVariables.get(0);
     }
 
+    @Override
     public boolean isChange() {
         return pkTypes != null;
     }
 
+    @Override
     public List<LogicalVariable> getPkVars(List<LogicalVariable> allVars) {
         if (pkTypes == null) {
             return null;
@@ -141,4 +163,45 @@ public class FeedDataSource extends AqlDataSource {
             return allVars.subList(1, allVars.size());
         }
     }
+
+    @Override
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildDatasourceScanRuntime(
+            AqlMetadataProvider aqlMetadataProvider, IDataSource<AqlSourceId> dataSource,
+            List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
+            List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema,
+            IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig)
+            throws AlgebricksException {
+        try {
+            ARecordType feedOutputType = (ARecordType) itemType;
+            ISerializerDeserializer payloadSerde = NonTaggedDataFormat.INSTANCE.getSerdeProvider()
+                    .getSerializerDeserializer(feedOutputType);
+            ArrayList<ISerializerDeserializer> serdes = new ArrayList<>();
+            serdes.add(payloadSerde);
+            if (metaItemType != null) {
+                serdes.add(AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(metaItemType));
+            }
+            if (pkTypes != null) {
+                for (IAType type : pkTypes) {
+                    serdes.add(AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(type));
+                }
+            }
+            RecordDescriptor feedDesc = new RecordDescriptor(
+                    serdes.toArray(new ISerializerDeserializer[serdes.size()]));
+            FeedPolicyEntity feedPolicy = (FeedPolicyEntity) getProperties()
+                    .get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
+            if (feedPolicy == null) {
+                throw new AlgebricksException("Feed not configured with a policy");
+            }
+            feedPolicy.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy.getPolicyName());
+            FeedConnectionId feedConnectionId = new FeedConnectionId(getId().getDataverseName(),
+                    getId().getDatasourceName(), getTargetDataset());
+            FeedCollectOperatorDescriptor feedCollector = new FeedCollectOperatorDescriptor(jobSpec, feedConnectionId,
+                    getSourceFeedId(), feedOutputType, feedDesc, feedPolicy.getProperties(), getLocation());
+
+            return new Pair<>(feedCollector, new AlgebricksAbsolutePartitionConstraint(getLocations()));
+
+        } catch (Exception e) {
+            throw new AlgebricksException(e);
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IMutationDataSource.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IMutationDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IMutationDataSource.java
new file mode 100644
index 0000000..2feb652
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IMutationDataSource.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.declared;
+
+import java.util.List;
+
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+
+public interface IMutationDataSource {
+
+    boolean isChange();
+
+    List<LogicalVariable> getPkVars(List<LogicalVariable> allVars);
+
+    List<ScalarFunctionCallExpression> getKeyAccessExpression();
+
+}


Mime
View raw message