asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From buyin...@apache.org
Subject [13/28] asterixdb git commit: Introduce IStorageComponentProvider
Date Thu, 02 Feb 2017 18:24:22 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
new file mode 100644
index 0000000..80792b5
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -0,0 +1,528 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils;
+
+import java.io.DataOutput;
+import java.io.File;
+import java.rmi.RemoteException;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Logger;
+
+import org.apache.asterix.builders.IARecordBuilder;
+import org.apache.asterix.builders.RecordBuilder;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory;
+import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.transactions.IResourceFactory;
+import org.apache.asterix.external.indexing.IndexingConstants;
+import org.apache.asterix.formats.base.IDataFormat;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.formats.nontagged.TypeTraitProvider;
+import org.apache.asterix.metadata.MetadataException;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.BTreeDataflowHelperFactoryProvider;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.CompactionPolicy;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Dataverse;
+import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.om.base.AMutableString;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.utils.RuntimeComponentsProvider;
+import org.apache.asterix.runtime.utils.RuntimeUtils;
+import org.apache.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadataFactory;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import org.apache.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileSplit;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor;
+import org.apache.hyracks.storage.common.file.ILocalResourceFactoryProvider;
+import org.apache.hyracks.storage.common.file.LocalResource;
+
+public class DatasetUtil {
+    private static final Logger LOGGER = Logger.getLogger(DatasetUtil.class.getName());
+    /*
+     * Dataset related operations
+     */
+    public static final byte OP_READ = 0x00;
+    public static final byte OP_INSERT = 0x01;
+    public static final byte OP_DELETE = 0x02;
+    public static final byte OP_UPSERT = 0x03;
+
+    private DatasetUtil() {
+    }
+
+    public static IBinaryComparatorFactory[] computeKeysBinaryComparatorFactories(Dataset dataset,
+            ARecordType itemType, ARecordType metaItemType, IBinaryComparatorFactoryProvider comparatorFactoryProvider)
+            throws AlgebricksException {
+        List<List<String>> partitioningKeys = getPartitioningKeys(dataset);
+        IBinaryComparatorFactory[] bcfs = new IBinaryComparatorFactory[partitioningKeys.size()];
+        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+            // Get comparators for RID fields.
+            for (int i = 0; i < partitioningKeys.size(); i++) {
+                try {
+                    bcfs[i] = IndexingConstants.getComparatorFactory(i);
+                } catch (AsterixException e) {
+                    throw new AlgebricksException(e);
+                }
+            }
+        } else {
+            InternalDatasetDetails dsd = (InternalDatasetDetails) dataset.getDatasetDetails();
+            for (int i = 0; i < partitioningKeys.size(); i++) {
+                IAType keyType = (dataset.hasMetaPart() && dsd.getKeySourceIndicator().get(i).intValue() == 1)
+                        ? metaItemType.getSubFieldType(partitioningKeys.get(i))
+                        : itemType.getSubFieldType(partitioningKeys.get(i));
+                bcfs[i] = comparatorFactoryProvider.getBinaryComparatorFactory(keyType, true);
+            }
+        }
+        return bcfs;
+    }
+
+    public static int[] createBloomFilterKeyFields(Dataset dataset) throws AlgebricksException {
+        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+            throw new AlgebricksException("not implemented");
+        }
+        List<List<String>> partitioningKeys = getPartitioningKeys(dataset);
+        int[] bloomFilterKeyFields = new int[partitioningKeys.size()];
+        for (int i = 0; i < partitioningKeys.size(); ++i) {
+            bloomFilterKeyFields[i] = i;
+        }
+        return bloomFilterKeyFields;
+    }
+
+    public static IBinaryHashFunctionFactory[] computeKeysBinaryHashFunFactories(Dataset dataset, ARecordType itemType,
+            IBinaryHashFunctionFactoryProvider hashFunProvider) throws AlgebricksException {
+        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+            throw new AlgebricksException("not implemented");
+        }
+        List<List<String>> partitioningKeys = getPartitioningKeys(dataset);
+        IBinaryHashFunctionFactory[] bhffs = new IBinaryHashFunctionFactory[partitioningKeys.size()];
+        for (int i = 0; i < partitioningKeys.size(); i++) {
+            IAType keyType = itemType.getSubFieldType(partitioningKeys.get(i));
+            bhffs[i] = hashFunProvider.getBinaryHashFunctionFactory(keyType);
+        }
+        return bhffs;
+    }
+
+    public static ITypeTraits[] computeTupleTypeTraits(Dataset dataset, ARecordType itemType, ARecordType metaItemType)
+            throws AlgebricksException {
+        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+            throw new AlgebricksException("not implemented");
+        }
+        List<List<String>> partitioningKeys = DatasetUtil.getPartitioningKeys(dataset);
+        int numKeys = partitioningKeys.size();
+        ITypeTraits[] typeTraits;
+        if (metaItemType != null) {
+            typeTraits = new ITypeTraits[numKeys + 2];
+            List<Integer> indicator = ((InternalDatasetDetails) dataset.getDatasetDetails()).getKeySourceIndicator();
+            typeTraits[numKeys + 1] = TypeTraitProvider.INSTANCE.getTypeTrait(metaItemType);
+            for (int i = 0; i < numKeys; i++) {
+                IAType keyType;
+                if (indicator.get(i) == 0) {
+                    keyType = itemType.getSubFieldType(partitioningKeys.get(i));
+                } else {
+                    keyType = metaItemType.getSubFieldType(partitioningKeys.get(i));
+                }
+                typeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+            }
+        } else {
+            typeTraits = new ITypeTraits[numKeys + 1];
+            for (int i = 0; i < numKeys; i++) {
+                IAType keyType;
+                keyType = itemType.getSubFieldType(partitioningKeys.get(i));
+                typeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+            }
+        }
+        typeTraits[numKeys] = TypeTraitProvider.INSTANCE.getTypeTrait(itemType);
+        return typeTraits;
+    }
+
+    public static List<List<String>> getPartitioningKeys(Dataset dataset) {
+        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+            return IndexingConstants
+                    .getRIDKeys(((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties());
+        }
+        return ((InternalDatasetDetails) dataset.getDatasetDetails()).getPartitioningKey();
+    }
+
+    public static List<String> getFilterField(Dataset dataset) {
+        return ((InternalDatasetDetails) dataset.getDatasetDetails()).getFilterField();
+    }
+
+    public static IBinaryComparatorFactory[] computeFilterBinaryComparatorFactories(Dataset dataset,
+            ARecordType itemType, IBinaryComparatorFactoryProvider comparatorFactoryProvider)
+            throws AlgebricksException {
+        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+            return null;
+        }
+        List<String> filterField = getFilterField(dataset);
+        if (filterField == null) {
+            return null;
+        }
+        IBinaryComparatorFactory[] bcfs = new IBinaryComparatorFactory[1];
+        IAType type = itemType.getSubFieldType(filterField);
+        bcfs[0] = comparatorFactoryProvider.getBinaryComparatorFactory(type, true);
+        return bcfs;
+    }
+
+    public static ITypeTraits[] computeFilterTypeTraits(Dataset dataset, ARecordType itemType)
+            throws AlgebricksException {
+        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+            return null;
+        }
+        List<String> filterField = getFilterField(dataset);
+        if (filterField == null) {
+            return null;
+        }
+        ITypeTraits[] typeTraits = new ITypeTraits[1];
+        IAType type = itemType.getSubFieldType(filterField);
+        typeTraits[0] = TypeTraitProvider.INSTANCE.getTypeTrait(type);
+        return typeTraits;
+    }
+
+    public static int[] createFilterFields(Dataset dataset) throws AlgebricksException {
+        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+            return null;
+        }
+
+        List<String> filterField = getFilterField(dataset);
+        if (filterField == null) {
+            return null;
+        }
+        List<List<String>> partitioningKeys = DatasetUtil.getPartitioningKeys(dataset);
+        int numKeys = partitioningKeys.size();
+
+        int[] filterFields = new int[1];
+        filterFields[0] = numKeys + 1;
+        return filterFields;
+    }
+
+    public static int[] createBTreeFieldsWhenThereisAFilter(Dataset dataset) throws AlgebricksException {
+        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+            return null;
+        }
+
+        List<String> filterField = getFilterField(dataset);
+        if (filterField == null) {
+            return null;
+        }
+
+        List<List<String>> partitioningKeys = getPartitioningKeys(dataset);
+        int valueFields = dataset.hasMetaPart() ? 2 : 1;
+        int[] btreeFields = new int[partitioningKeys.size() + valueFields];
+        for (int i = 0; i < btreeFields.length; ++i) {
+            btreeFields[i] = i;
+        }
+        return btreeFields;
+    }
+
+    public static int getPositionOfPartitioningKeyField(Dataset dataset, String fieldExpr) {
+        List<List<String>> partitioningKeys = DatasetUtil.getPartitioningKeys(dataset);
+        for (int i = 0; i < partitioningKeys.size(); i++) {
+            if ((partitioningKeys.get(i).size() == 1) && partitioningKeys.get(i).get(0).equals(fieldExpr)) {
+                return i;
+            }
+        }
+        return -1;
+    }
+
+    public static Pair<ILSMMergePolicyFactory, Map<String, String>> getMergePolicyFactory(Dataset dataset,
+            MetadataTransactionContext mdTxnCtx) throws AlgebricksException, MetadataException {
+        String policyName = dataset.getCompactionPolicy();
+        CompactionPolicy compactionPolicy = MetadataManager.INSTANCE.getCompactionPolicy(mdTxnCtx,
+                MetadataConstants.METADATA_DATAVERSE_NAME, policyName);
+        String compactionPolicyFactoryClassName = compactionPolicy.getClassName();
+        ILSMMergePolicyFactory mergePolicyFactory;
+        try {
+            mergePolicyFactory =
+                    (ILSMMergePolicyFactory) Class.forName(compactionPolicyFactoryClassName).newInstance();
+            if (mergePolicyFactory.getName().compareTo("correlated-prefix") == 0) {
+                ((CorrelatedPrefixMergePolicyFactory) mergePolicyFactory).setDatasetID(dataset.getDatasetId());
+            }
+        } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+            throw new AlgebricksException(e);
+        }
+        Map<String, String> properties = dataset.getCompactionPolicyProperties();
+        return new Pair<>(mergePolicyFactory, properties);
+    }
+
+    @SuppressWarnings("unchecked")
+    public static void writePropertyTypeRecord(String name, String value, DataOutput out, ARecordType recordType)
+            throws HyracksDataException {
+        IARecordBuilder propertyRecordBuilder = new RecordBuilder();
+        ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage();
+        propertyRecordBuilder.reset(recordType);
+        AMutableString aString = new AMutableString("");
+        ISerializerDeserializer<AString> stringSerde =
+                SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING);
+
+        // write field 0
+        fieldValue.reset();
+        aString.setValue(name);
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        propertyRecordBuilder.addField(0, fieldValue);
+
+        // write field 1
+        fieldValue.reset();
+        aString.setValue(value);
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        propertyRecordBuilder.addField(1, fieldValue);
+
+        propertyRecordBuilder.write(out, true);
+    }
+
+    public static ARecordType getMetaType(MetadataProvider metadataProvider, Dataset dataset)
+            throws AlgebricksException {
+        if (dataset.hasMetaPart()) {
+            return (ARecordType) metadataProvider.findType(dataset.getMetaItemTypeDataverseName(),
+                    dataset.getMetaItemTypeName());
+        }
+        return null;
+    }
+
+    public static JobSpecification createDropDatasetJobSpec(Dataset dataset, Index primaryIndex,
+            MetadataProvider metadataProvider)
+            throws AlgebricksException, HyracksDataException, RemoteException, ACIDException {
+        String datasetPath = dataset.getDataverseName() + File.separator + dataset.getDatasetName();
+        LOGGER.info("DROP DATASETPATH: " + datasetPath);
+        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+            return RuntimeUtils.createJobSpecification();
+        }
+        boolean temp = dataset.getDatasetDetails().isTemp();
+        ARecordType itemType =
+                (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
+        ARecordType metaType = DatasetUtil.getMetaType(metadataProvider, dataset);
+        JobSpecification specPrimary = RuntimeUtils.createJobSpecification();
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
+                metadataProvider.getSplitProviderAndConstraints(dataset.getDataverseName(), dataset.getDatasetName(),
+                        dataset.getDatasetName(), temp);
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+                DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
+        IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory(
+                metadataProvider, primaryIndex, itemType, metaType, compactionInfo.first, compactionInfo.second);
+        IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider();
+        IndexDropOperatorDescriptor primaryBtreeDrop =
+                new IndexDropOperatorDescriptor(specPrimary, storageComponentProvider.getStorageManager(),
+                        storageComponentProvider.getIndexLifecycleManagerProvider(), splitsAndConstraint.first,
+                        indexDataflowHelperFactory, storageComponentProvider.getMetadataPageManagerFactory());
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specPrimary, primaryBtreeDrop,
+                splitsAndConstraint.second);
+        specPrimary.addRoot(primaryBtreeDrop);
+        return specPrimary;
+    }
+
+    public static JobSpecification buildDropFilesIndexJobSpec(MetadataProvider metadataProvider, Dataset dataset)
+            throws AlgebricksException {
+        String indexName = IndexingConstants.getFilesIndexName(dataset.getDatasetName());
+        JobSpecification spec = RuntimeUtils.createJobSpecification();
+        IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider();
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
+                metadataProvider.splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(),
+                        dataset.getDatasetName(), indexName, true);
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+                DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
+        String fileIndexName = BTreeDataflowHelperFactoryProvider.externalFileIndexName(dataset);
+        Index fileIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
+                dataset.getDataverseName(), dataset.getDatasetName(), fileIndexName);
+        IIndexDataflowHelperFactory dataflowHelperFactory = dataset.getIndexDataflowHelperFactory(metadataProvider,
+                fileIndex, null, null, compactionInfo.first, compactionInfo.second);
+        IndexDropOperatorDescriptor btreeDrop =
+                new IndexDropOperatorDescriptor(spec, storageComponentProvider.getStorageManager(),
+                        storageComponentProvider.getIndexLifecycleManagerProvider(), splitsAndConstraint.first,
+                        dataflowHelperFactory, storageComponentProvider.getMetadataPageManagerFactory());
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeDrop,
+                splitsAndConstraint.second);
+        spec.addRoot(btreeDrop);
+        return spec;
+    }
+
+    public static JobSpecification dropDatasetJobSpec(Dataset dataset, Index primaryIndex,
+            MetadataProvider metadataProvider)
+            throws AlgebricksException, HyracksDataException, RemoteException, ACIDException {
+        String datasetPath = dataset.getDataverseName() + File.separator + dataset.getDatasetName();
+        LOGGER.info("DROP DATASETPATH: " + datasetPath);
+        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+            return RuntimeUtils.createJobSpecification();
+        }
+
+        boolean temp = dataset.getDatasetDetails().isTemp();
+        ARecordType itemType =
+                (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
+        ARecordType metaType = DatasetUtil.getMetaType(metadataProvider, dataset);
+        JobSpecification specPrimary = RuntimeUtils.createJobSpecification();
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
+                metadataProvider.getSplitProviderAndConstraints(dataset.getDataverseName(), dataset.getDatasetName(),
+                        dataset.getDatasetName(), temp);
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+                DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
+
+        IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory(
+                metadataProvider, primaryIndex, itemType, metaType, compactionInfo.first, compactionInfo.second);
+        IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider();
+        IndexDropOperatorDescriptor primaryBtreeDrop =
+                new IndexDropOperatorDescriptor(specPrimary, storageComponentProvider.getStorageManager(),
+                        storageComponentProvider.getIndexLifecycleManagerProvider(), splitsAndConstraint.first,
+                        indexDataflowHelperFactory, storageComponentProvider.getMetadataPageManagerFactory());
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specPrimary, primaryBtreeDrop,
+                splitsAndConstraint.second);
+
+        specPrimary.addRoot(primaryBtreeDrop);
+
+        return specPrimary;
+    }
+
+    public static JobSpecification createDatasetJobSpec(Dataverse dataverse, String datasetName,
+            MetadataProvider metadataProvider) throws AsterixException, AlgebricksException {
+        IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider();
+        String dataverseName = dataverse.getDataverseName();
+        IDataFormat format;
+        try {
+            format = (IDataFormat) Class.forName(dataverse.getDataFormat()).newInstance();
+        } catch (Exception e) {
+            throw new AsterixException(e);
+        }
+        Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
+        if (dataset == null) {
+            throw new AsterixException("Could not find dataset " + datasetName + " in dataverse " + dataverseName);
+        }
+        Index index = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
+                datasetName, datasetName);
+        boolean temp = dataset.getDatasetDetails().isTemp();
+        ARecordType itemType =
+                (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
+        // get meta item type
+        ARecordType metaItemType = null;
+        if (dataset.hasMetaPart()) {
+            metaItemType = (ARecordType) metadataProvider.findType(dataset.getMetaItemTypeDataverseName(),
+                    dataset.getMetaItemTypeName());
+        }
+        JobSpecification spec = RuntimeUtils.createJobSpecification();
+        IBinaryComparatorFactory[] comparatorFactories = DatasetUtil.computeKeysBinaryComparatorFactories(dataset,
+                itemType, metaItemType, format.getBinaryComparatorFactoryProvider());
+        ITypeTraits[] typeTraits = DatasetUtil.computeTupleTypeTraits(dataset, itemType, metaItemType);
+        int[] bloomFilterKeyFields = DatasetUtil.createBloomFilterKeyFields(dataset);
+
+        ITypeTraits[] filterTypeTraits = DatasetUtil.computeFilterTypeTraits(dataset, itemType);
+        IBinaryComparatorFactory[] filterCmpFactories = DatasetUtil.computeFilterBinaryComparatorFactories(dataset,
+                itemType, format.getBinaryComparatorFactoryProvider());
+        int[] filterFields = DatasetUtil.createFilterFields(dataset);
+        int[] btreeFields = DatasetUtil.createBTreeFieldsWhenThereisAFilter(dataset);
+
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
+                metadataProvider.getSplitProviderAndConstraints(dataverseName, datasetName, datasetName, temp);
+        FileSplit[] fs = splitsAndConstraint.first.getFileSplits();
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < fs.length; i++) {
+            sb.append(fs[i] + " ");
+        }
+        LOGGER.info("CREATING File Splits: " + sb.toString());
+
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+                DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
+        //prepare a LocalResourceMetadata which will be stored in NC's local resource repository
+        IResourceFactory localResourceMetadata = new LSMBTreeLocalResourceMetadataFactory(typeTraits,
+                comparatorFactories, bloomFilterKeyFields, true, dataset.getDatasetId(), compactionInfo.first,
+                compactionInfo.second, filterTypeTraits, filterCmpFactories, btreeFields, filterFields,
+                dataset.getIndexOperationTrackerFactory(index), dataset.getIoOperationCallbackFactory(index),
+                storageComponentProvider.getMetadataPageManagerFactory());
+        ILocalResourceFactoryProvider localResourceFactoryProvider =
+                new PersistentLocalResourceFactoryProvider(localResourceMetadata, LocalResource.LSMBTreeResource);
+        IIndexDataflowHelperFactory dataflowHelperFactory = dataset.getIndexDataflowHelperFactory(metadataProvider,
+                index, itemType, metaItemType, compactionInfo.first, compactionInfo.second);
+        TreeIndexCreateOperatorDescriptor indexCreateOp = new TreeIndexCreateOperatorDescriptor(spec,
+                RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER,
+                splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields,
+                dataflowHelperFactory, localResourceFactoryProvider, NoOpOperationCallbackFactory.INSTANCE,
+                storageComponentProvider.getMetadataPageManagerFactory());
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp,
+                splitsAndConstraint.second);
+        spec.addRoot(indexCreateOp);
+        return spec;
+    }
+
+    public static JobSpecification compactDatasetJobSpec(Dataverse dataverse, String datasetName,
+            MetadataProvider metadataProvider) throws AsterixException, AlgebricksException {
+        String dataverseName = dataverse.getDataverseName();
+        IDataFormat format;
+        try {
+            format = (IDataFormat) Class.forName(dataverse.getDataFormat()).newInstance();
+        } catch (Exception e) {
+            throw new AsterixException(e);
+        }
+        Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
+        if (dataset == null) {
+            throw new AsterixException("Could not find dataset " + datasetName + " in dataverse " + dataverseName);
+        }
+        boolean temp = dataset.getDatasetDetails().isTemp();
+        ARecordType itemType =
+                (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
+        ARecordType metaItemType = DatasetUtil.getMetaType(metadataProvider, dataset);
+        JobSpecification spec = RuntimeUtils.createJobSpecification();
+        IBinaryComparatorFactory[] comparatorFactories = DatasetUtil.computeKeysBinaryComparatorFactories(dataset,
+                itemType, metaItemType, format.getBinaryComparatorFactoryProvider());
+        ITypeTraits[] typeTraits = DatasetUtil.computeTupleTypeTraits(dataset, itemType, metaItemType);
+        int[] blooFilterKeyFields = DatasetUtil.createBloomFilterKeyFields(dataset);
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
+                metadataProvider.getSplitProviderAndConstraints(dataverseName, datasetName, datasetName, temp);
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+                DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
+        Index index = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
+                dataset.getDataverseName(), datasetName, datasetName);
+        IIndexDataflowHelperFactory dataflowHelperFactory = dataset.getIndexDataflowHelperFactory(metadataProvider,
+                index, itemType, metaItemType, compactionInfo.first, compactionInfo.second);
+        LSMTreeIndexCompactOperatorDescriptor compactOp = new LSMTreeIndexCompactOperatorDescriptor(spec,
+                RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER,
+                splitsAndConstraint.first, typeTraits, comparatorFactories, blooFilterKeyFields, dataflowHelperFactory,
+                NoOpOperationCallbackFactory.INSTANCE,
+                metadataProvider.getStorageComponentProvider().getMetadataPageManagerFactory());
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp,
+                splitsAndConstraint.second);
+
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp,
+                splitsAndConstraint.second);
+        spec.addRoot(compactOp);
+        return spec;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtils.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtils.java
deleted file mode 100644
index 18baab1..0000000
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtils.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.metadata.utils;
-
-import java.io.DataOutput;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.builders.IARecordBuilder;
-import org.apache.asterix.builders.RecordBuilder;
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.indexing.IndexingConstants;
-import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
-import org.apache.asterix.formats.nontagged.TypeTraitProvider;
-import org.apache.asterix.metadata.MetadataException;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.declared.MetadataProvider;
-import org.apache.asterix.metadata.entities.CompactionPolicy;
-import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
-import org.apache.asterix.metadata.entities.InternalDatasetDetails;
-import org.apache.asterix.om.base.AMutableString;
-import org.apache.asterix.om.base.AString;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.BuiltinType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
-import org.apache.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.ITypeTraits;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
-
-public class DatasetUtils {
-    public static IBinaryComparatorFactory[] computeKeysBinaryComparatorFactories(Dataset dataset, ARecordType itemType,
-            ARecordType metaItemType, IBinaryComparatorFactoryProvider comparatorFactoryProvider)
-            throws AlgebricksException {
-        List<List<String>> partitioningKeys = getPartitioningKeys(dataset);
-        IBinaryComparatorFactory[] bcfs = new IBinaryComparatorFactory[partitioningKeys.size()];
-        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
-            // Get comparators for RID fields.
-            for (int i = 0; i < partitioningKeys.size(); i++) {
-                try {
-                    bcfs[i] = IndexingConstants.getComparatorFactory(i);
-                } catch (AsterixException e) {
-                    throw new AlgebricksException(e);
-                }
-            }
-        } else {
-            InternalDatasetDetails dsd = (InternalDatasetDetails) dataset.getDatasetDetails();
-            for (int i = 0; i < partitioningKeys.size(); i++) {
-                IAType keyType = (dataset.hasMetaPart() && dsd.getKeySourceIndicator().get(i).intValue() == 1)
-                        ? metaItemType.getSubFieldType(partitioningKeys.get(i))
-                        : itemType.getSubFieldType(partitioningKeys.get(i));
-                bcfs[i] = comparatorFactoryProvider.getBinaryComparatorFactory(keyType, true);
-            }
-        }
-        return bcfs;
-    }
-
-    public static int[] createBloomFilterKeyFields(Dataset dataset) throws AlgebricksException {
-        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
-            throw new AlgebricksException("not implemented");
-        }
-        List<List<String>> partitioningKeys = getPartitioningKeys(dataset);
-        int[] bloomFilterKeyFields = new int[partitioningKeys.size()];
-        for (int i = 0; i < partitioningKeys.size(); ++i) {
-            bloomFilterKeyFields[i] = i;
-        }
-        return bloomFilterKeyFields;
-    }
-
-    public static IBinaryHashFunctionFactory[] computeKeysBinaryHashFunFactories(Dataset dataset, ARecordType itemType,
-            IBinaryHashFunctionFactoryProvider hashFunProvider) throws AlgebricksException {
-        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
-            throw new AlgebricksException("not implemented");
-        }
-        List<List<String>> partitioningKeys = getPartitioningKeys(dataset);
-        IBinaryHashFunctionFactory[] bhffs = new IBinaryHashFunctionFactory[partitioningKeys.size()];
-        for (int i = 0; i < partitioningKeys.size(); i++) {
-            IAType keyType = itemType.getSubFieldType(partitioningKeys.get(i));
-            bhffs[i] = hashFunProvider.getBinaryHashFunctionFactory(keyType);
-        }
-        return bhffs;
-    }
-
-    public static ITypeTraits[] computeTupleTypeTraits(Dataset dataset, ARecordType itemType, ARecordType metaItemType)
-            throws AlgebricksException {
-        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
-            throw new AlgebricksException("not implemented");
-        }
-        List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
-        int numKeys = partitioningKeys.size();
-        ITypeTraits[] typeTraits;
-        if (metaItemType != null) {
-            typeTraits = new ITypeTraits[numKeys + 2];
-            List<Integer> indicator = ((InternalDatasetDetails) dataset.getDatasetDetails()).getKeySourceIndicator();
-            typeTraits[numKeys + 1] = TypeTraitProvider.INSTANCE.getTypeTrait(metaItemType);
-            for (int i = 0; i < numKeys; i++) {
-                IAType keyType;
-                if (indicator.get(i) == 0) {
-                    keyType = itemType.getSubFieldType(partitioningKeys.get(i));
-                } else {
-                    keyType = metaItemType.getSubFieldType(partitioningKeys.get(i));
-                }
-                typeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-            }
-        } else {
-            typeTraits = new ITypeTraits[numKeys + 1];
-            for (int i = 0; i < numKeys; i++) {
-                IAType keyType;
-                keyType = itemType.getSubFieldType(partitioningKeys.get(i));
-                typeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-            }
-        }
-        typeTraits[numKeys] = TypeTraitProvider.INSTANCE.getTypeTrait(itemType);
-        return typeTraits;
-    }
-
-    public static List<List<String>> getPartitioningKeys(Dataset dataset) {
-        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
-            return IndexingConstants.getRIDKeys(((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties());
-        }
-        return ((InternalDatasetDetails) dataset.getDatasetDetails()).getPartitioningKey();
-    }
-
-    public static List<String> getFilterField(Dataset dataset) {
-        return (((InternalDatasetDetails) dataset.getDatasetDetails())).getFilterField();
-    }
-
-    public static IBinaryComparatorFactory[] computeFilterBinaryComparatorFactories(Dataset dataset,
-            ARecordType itemType, IBinaryComparatorFactoryProvider comparatorFactoryProvider)
-            throws AlgebricksException {
-        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
-            return null;
-        }
-        List<String> filterField = getFilterField(dataset);
-        if (filterField == null) {
-            return null;
-        }
-        IBinaryComparatorFactory[] bcfs = new IBinaryComparatorFactory[1];
-        IAType type = itemType.getSubFieldType(filterField);
-        bcfs[0] = comparatorFactoryProvider.getBinaryComparatorFactory(type, true);
-        return bcfs;
-    }
-
-    public static ITypeTraits[] computeFilterTypeTraits(Dataset dataset, ARecordType itemType)
-            throws AlgebricksException {
-        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
-            return null;
-        }
-        List<String> filterField = getFilterField(dataset);
-        if (filterField == null) {
-            return null;
-        }
-        ITypeTraits[] typeTraits = new ITypeTraits[1];
-        IAType type = itemType.getSubFieldType(filterField);
-        typeTraits[0] = TypeTraitProvider.INSTANCE.getTypeTrait(type);
-        return typeTraits;
-    }
-
-    public static int[] createFilterFields(Dataset dataset) throws AlgebricksException {
-        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
-            return null;
-        }
-
-        List<String> filterField = getFilterField(dataset);
-        if (filterField == null) {
-            return null;
-        }
-        List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
-        int numKeys = partitioningKeys.size();
-
-        int[] filterFields = new int[1];
-        filterFields[0] = numKeys + 1;
-        return filterFields;
-    }
-
-    public static int[] createBTreeFieldsWhenThereisAFilter(Dataset dataset) throws AlgebricksException {
-        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
-            return null;
-        }
-
-        List<String> filterField = getFilterField(dataset);
-        if (filterField == null) {
-            return null;
-        }
-
-        List<List<String>> partitioningKeys = getPartitioningKeys(dataset);
-        int valueFields = dataset.hasMetaPart() ? 2 : 1;
-        int[] btreeFields = new int[partitioningKeys.size() + valueFields];
-        for (int i = 0; i < btreeFields.length; ++i) {
-            btreeFields[i] = i;
-        }
-        return btreeFields;
-    }
-
-    public static int getPositionOfPartitioningKeyField(Dataset dataset, String fieldExpr) {
-        List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
-        for (int i = 0; i < partitioningKeys.size(); i++) {
-            if ((partitioningKeys.get(i).size() == 1) && partitioningKeys.get(i).get(0).equals(fieldExpr)) {
-                return i;
-            }
-        }
-        return -1;
-    }
-
-    public static Pair<ILSMMergePolicyFactory, Map<String, String>> getMergePolicyFactory(Dataset dataset,
-            MetadataTransactionContext mdTxnCtx) throws AlgebricksException, MetadataException {
-        String policyName = dataset.getCompactionPolicy();
-        CompactionPolicy compactionPolicy = MetadataManager.INSTANCE.getCompactionPolicy(mdTxnCtx,
-                MetadataConstants.METADATA_DATAVERSE_NAME, policyName);
-        String compactionPolicyFactoryClassName = compactionPolicy.getClassName();
-        ILSMMergePolicyFactory mergePolicyFactory;
-        try {
-            mergePolicyFactory = (ILSMMergePolicyFactory) Class.forName(compactionPolicyFactoryClassName).newInstance();
-            if (mergePolicyFactory.getName().compareTo("correlated-prefix") == 0) {
-                ((CorrelatedPrefixMergePolicyFactory) mergePolicyFactory).setDatasetID(dataset.getDatasetId());
-            }
-        } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
-            throw new AlgebricksException(e);
-        }
-        Map<String, String> properties = dataset.getCompactionPolicyProperties();
-        return new Pair<ILSMMergePolicyFactory, Map<String, String>>(mergePolicyFactory, properties);
-    }
-
-    @SuppressWarnings("unchecked")
-    public static void writePropertyTypeRecord(String name, String value, DataOutput out, ARecordType recordType)
-            throws HyracksDataException {
-        IARecordBuilder propertyRecordBuilder = new RecordBuilder();
-        ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage();
-        propertyRecordBuilder.reset(recordType);
-        AMutableString aString = new AMutableString("");
-        ISerializerDeserializer<AString> stringSerde =
-                SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING);
-
-        // write field 0
-        fieldValue.reset();
-        aString.setValue(name);
-        stringSerde.serialize(aString, fieldValue.getDataOutput());
-        propertyRecordBuilder.addField(0, fieldValue);
-
-        // write field 1
-        fieldValue.reset();
-        aString.setValue(value);
-        stringSerde.serialize(aString, fieldValue.getDataOutput());
-        propertyRecordBuilder.addField(1, fieldValue);
-
-        propertyRecordBuilder.write(out, true);
-    }
-
-    public static ARecordType getMetaType(MetadataProvider metadataProvider, Dataset dataset)
-            throws AlgebricksException {
-        if (dataset.hasMetaPart()) {
-            return (ARecordType) metadataProvider.findType(dataset.getMetaItemTypeDataverseName(),
-                    dataset.getMetaItemTypeName());
-        }
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalDatasetsRegistry.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalDatasetsRegistry.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalDatasetsRegistry.java
index 17a3ebe..2e35ed4 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalDatasetsRegistry.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalDatasetsRegistry.java
@@ -34,11 +34,11 @@ import org.apache.asterix.metadata.entities.Dataset;
  * @author alamouda
  */
 public class ExternalDatasetsRegistry {
-    public static ExternalDatasetsRegistry INSTANCE = new ExternalDatasetsRegistry();
-    private ConcurrentHashMap<String, ExternalDatasetAccessManager> globalRegister;
+    public static final ExternalDatasetsRegistry INSTANCE = new ExternalDatasetsRegistry();
+    private final ConcurrentHashMap<String, ExternalDatasetAccessManager> globalRegister;
 
     private ExternalDatasetsRegistry() {
-        globalRegister = new ConcurrentHashMap<String, ExternalDatasetAccessManager>();
+        globalRegister = new ConcurrentHashMap<>();
     }
 
     /**
@@ -59,12 +59,12 @@ public class ExternalDatasetsRegistry {
 
     public int getAndLockDatasetVersion(Dataset dataset, MetadataProvider metadataProvider) {
 
-        Map<String, Integer> locks = null;
+        Map<String, Integer> locks;
         String lockKey = dataset.getDataverseName() + "." + dataset.getDatasetName();
         // check first if the lock was aquired already
         locks = metadataProvider.getLocks();
         if (locks == null) {
-            locks = new HashMap<String, Integer>();
+            locks = new HashMap<>();
             metadataProvider.setLocks(locks);
         } else {
             // if dataset was accessed already by this job, return the registered version
@@ -130,7 +130,10 @@ public class ExternalDatasetsRegistry {
             // if dataset was accessed already by this job, return the registered version
             Set<Entry<String, Integer>> aquiredLocks = locks.entrySet();
             for (Entry<String, Integer> entry : aquiredLocks) {
-                globalRegister.get(entry.getKey()).queryEnd(entry.getValue());
+                ExternalDatasetAccessManager accessManager = globalRegister.get(entry.getKey());
+                if (accessManager != null) {
+                    accessManager.queryEnd(entry.getValue());
+                }
             }
             locks.clear();
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
new file mode 100644
index 0000000..249f035
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
@@ -0,0 +1,621 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
+import org.apache.asterix.common.config.DatasetConfig.TransactionState;
+import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.transactions.IResourceFactory;
+import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.indexing.FilesIndexDescription;
+import org.apache.asterix.external.indexing.IndexingConstants;
+import org.apache.asterix.external.operators.ExternalDatasetIndexesAbortOperatorDescriptor;
+import org.apache.asterix.external.operators.ExternalDatasetIndexesCommitOperatorDescriptor;
+import org.apache.asterix.external.operators.ExternalDatasetIndexesRecoverOperatorDescriptor;
+import org.apache.asterix.external.operators.ExternalFilesIndexOperatorDescriptor;
+import org.apache.asterix.external.operators.ExternalScanOperatorDescriptor;
+import org.apache.asterix.external.operators.IndexInfoOperatorDescriptor;
+import org.apache.asterix.external.provider.AdapterFactoryProvider;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.declared.BTreeDataflowHelperFactoryProvider;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.utils.RuntimeComponentsProvider;
+import org.apache.asterix.runtime.utils.RuntimeUtils;
+import org.apache.asterix.transaction.management.resource.ExternalBTreeLocalResourceMetadataFactory;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor;
+import org.apache.hyracks.storage.common.file.LocalResource;
+
+public class ExternalIndexingOperations {
+    private static final Logger LOGGER = Logger.getLogger(ExternalIndexingOperations.class.getName());
+    public static final List<List<String>> FILE_INDEX_FIELD_NAMES =
+            Collections.unmodifiableList(Collections.singletonList(Collections.singletonList("")));
+    public static final List<IAType> FILE_INDEX_FIELD_TYPES =
+            Collections.unmodifiableList(Collections.singletonList(BuiltinType.ASTRING));
+
+    private ExternalIndexingOperations() {
+    }
+
+    public static boolean isIndexible(ExternalDatasetDetails ds) {
+        String adapter = ds.getAdapter();
+        if (adapter.equalsIgnoreCase(ExternalDataConstants.ALIAS_HDFS_ADAPTER)) {
+            return true;
+        }
+        return false;
+    }
+
+    public static boolean isRefereshActive(ExternalDatasetDetails ds) {
+        return ds.getState() != TransactionState.COMMIT;
+    }
+
+    public static boolean isValidIndexName(String datasetName, String indexName) {
+        return !datasetName.concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX).equals(indexName);
+    }
+
+    public static int getRIDSize(Dataset dataset) {
+        ExternalDatasetDetails dsd = (ExternalDatasetDetails) dataset.getDatasetDetails();
+        return IndexingConstants.getRIDSize(dsd.getProperties().get(IndexingConstants.KEY_INPUT_FORMAT));
+    }
+
+    public static IBinaryComparatorFactory[] getComparatorFactories(Dataset dataset) {
+        ExternalDatasetDetails dsd = (ExternalDatasetDetails) dataset.getDatasetDetails();
+        return IndexingConstants.getComparatorFactories(dsd.getProperties().get(IndexingConstants.KEY_INPUT_FORMAT));
+    }
+
+    public static IBinaryComparatorFactory[] getBuddyBtreeComparatorFactories() {
+        return IndexingConstants.getBuddyBtreeComparatorFactories();
+    }
+
+    public static List<ExternalFile> getSnapshotFromExternalFileSystem(Dataset dataset) throws AlgebricksException {
+        ArrayList<ExternalFile> files = new ArrayList<>();
+        ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
+        try {
+            // Create the file system object
+            FileSystem fs = getFileSystemObject(datasetDetails.getProperties());
+            // Get paths of dataset
+            String path = datasetDetails.getProperties().get(ExternalDataConstants.KEY_PATH);
+            String[] paths = path.split(",");
+
+            // Add fileStatuses to files
+            for (String aPath : paths) {
+                FileStatus[] fileStatuses = fs.listStatus(new Path(aPath));
+                for (int i = 0; i < fileStatuses.length; i++) {
+                    int nextFileNumber = files.size();
+                    handleFile(dataset, files, fs, fileStatuses[i], nextFileNumber);
+                }
+            }
+            // Close file system
+            fs.close();
+            if (files.isEmpty()) {
+                throw new AlgebricksException("File Snapshot retrieved from external file system is empty");
+            }
+            return files;
+        } catch (Exception e) {
+            LOGGER.log(Level.WARNING, "Exception while trying to get snapshot from external system", e);
+            throw new AlgebricksException("Unable to get list of HDFS files " + e);
+        }
+    }
+
+    private static void handleFile(Dataset dataset, List<ExternalFile> files, FileSystem fs, FileStatus fileStatus,
+            int nextFileNumber) throws IOException {
+        if (fileStatus.isDirectory()) {
+            listSubFiles(dataset, fs, fileStatus, files);
+        } else {
+            files.add(new ExternalFile(dataset.getDataverseName(), dataset.getDatasetName(), nextFileNumber,
+                    fileStatus.getPath().toUri().getPath(), new Date(fileStatus.getModificationTime()),
+                    fileStatus.getLen(), ExternalFilePendingOp.NO_OP));
+        }
+    }
+
+    /* list all files under the directory
+     * src is expected to be a folder
+     */
+    private static void listSubFiles(Dataset dataset, FileSystem srcFs, FileStatus src, List<ExternalFile> files)
+            throws IOException {
+        Path path = src.getPath();
+        FileStatus[] fileStatuses = srcFs.listStatus(path);
+        for (int i = 0; i < fileStatuses.length; i++) {
+            int nextFileNumber = files.size();
+            if (fileStatuses[i].isDirectory()) {
+                listSubFiles(dataset, srcFs, fileStatuses[i], files);
+            } else {
+                files.add(new ExternalFile(dataset.getDataverseName(), dataset.getDatasetName(), nextFileNumber,
+                        fileStatuses[i].getPath().toUri().getPath(), new Date(fileStatuses[i].getModificationTime()),
+                        fileStatuses[i].getLen(), ExternalFilePendingOp.NO_OP));
+            }
+        }
+    }
+
+    public static FileSystem getFileSystemObject(Map<String, String> map) throws IOException {
+        Configuration conf = new Configuration();
+        conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_URI, map.get(ExternalDataConstants.KEY_HDFS_URL).trim());
+        conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_CLASS, DistributedFileSystem.class.getName());
+        return FileSystem.get(conf);
+    }
+
+    public static JobSpecification buildFilesIndexReplicationJobSpec(Dataset dataset,
+            List<ExternalFile> externalFilesSnapshot, MetadataProvider metadataProvider, boolean createIndex)
+            throws AlgebricksException {
+        IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider();
+        JobSpecification spec = RuntimeUtils.createJobSpecification();
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+                DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
+        ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
+        Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint =
+                metadataProvider.splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(),
+                        dataset.getDatasetName(), IndexingConstants.getFilesIndexName(dataset.getDatasetName()), true);
+        IFileSplitProvider secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
+        FilesIndexDescription filesIndexDescription = new FilesIndexDescription();
+        String fileIndexName = BTreeDataflowHelperFactoryProvider.externalFileIndexName(dataset);
+        Index fileIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
+                dataset.getDataverseName(), dataset.getDatasetName(), fileIndexName);
+        IResourceFactory localResourceMetadata = new ExternalBTreeLocalResourceMetadataFactory(
+                filesIndexDescription.EXTERNAL_FILE_INDEX_TYPE_TRAITS,
+                FilesIndexDescription.FILES_INDEX_COMP_FACTORIES, new int[] { 0 }, false, dataset.getDatasetId(),
+                mergePolicyFactory, mergePolicyFactoryProperties, dataset.getIndexOperationTrackerFactory(fileIndex),
+                dataset.getIoOperationCallbackFactory(fileIndex),
+                storageComponentProvider.getMetadataPageManagerFactory());
+        PersistentLocalResourceFactoryProvider localResourceFactoryProvider =
+                new PersistentLocalResourceFactoryProvider(localResourceMetadata, LocalResource.ExternalBTreeResource);
+        IIndexDataflowHelperFactory dataflowHelperFactory = dataset.getIndexDataflowHelperFactory(metadataProvider,
+                fileIndex, null, null, mergePolicyFactory, mergePolicyFactoryProperties);
+        ExternalFilesIndexOperatorDescriptor externalFilesOp =
+                new ExternalFilesIndexOperatorDescriptor(spec, storageComponentProvider.getStorageManager(),
+                        storageComponentProvider.getIndexLifecycleManagerProvider(), secondaryFileSplitProvider,
+                        dataflowHelperFactory, localResourceFactoryProvider, externalFilesSnapshot, createIndex,
+                        storageComponentProvider.getMetadataPageManagerFactory());
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, externalFilesOp,
+                secondarySplitsAndConstraint.second);
+        spec.addRoot(externalFilesOp);
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    /**
+     * This method create an indexing operator that index records in HDFS
+     *
+     * @param jobSpec
+     * @param itemType
+     * @param dataset
+     * @param files
+     * @param indexerDesc
+     * @return
+     * @throws AlgebricksException
+     * @throws HyracksDataException
+     * @throws Exception
+     */
+    private static Pair<ExternalScanOperatorDescriptor, AlgebricksPartitionConstraint> getIndexingOperator(
+            MetadataProvider metadataProvider, JobSpecification jobSpec, IAType itemType, Dataset dataset,
+            List<ExternalFile> files, RecordDescriptor indexerDesc) throws HyracksDataException, AlgebricksException {
+        ExternalDatasetDetails externalDatasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
+        Map<String, String> configuration = externalDatasetDetails.getProperties();
+        IAdapterFactory adapterFactory =
+                AdapterFactoryProvider.getIndexingAdapterFactory(metadataProvider.getLibraryManager(),
+                        externalDatasetDetails.getAdapter(), configuration, (ARecordType) itemType, files, true, null);
+        return new Pair<>(new ExternalScanOperatorDescriptor(jobSpec, indexerDesc, adapterFactory),
+                adapterFactory.getPartitionConstraint());
+    }
+
+    public static Pair<ExternalScanOperatorDescriptor, AlgebricksPartitionConstraint> createExternalIndexingOp(
+            JobSpecification spec, MetadataProvider metadataProvider, Dataset dataset, ARecordType itemType,
+            RecordDescriptor indexerDesc, List<ExternalFile> files) throws HyracksDataException, AlgebricksException {
+        return getIndexingOperator(metadataProvider, spec, itemType, dataset,
+                files == null ? MetadataManager.INSTANCE
+                        .getDatasetExternalFiles(metadataProvider.getMetadataTxnContext(), dataset) : files,
+                indexerDesc);
+    }
+
+    /**
+     * At the end of this method, we expect to have 4 sets as follows:
+     * metadataFiles should contain only the files that are appended in their original state
+     * addedFiles should contain new files that has number assigned starting after the max original file number
+     * deletedFiles should contain files that are no longer there in the file system
+     * appendedFiles should have the new file information of existing files
+     * The method should return false in case of zero delta
+     *
+     * @param dataset
+     * @param metadataFiles
+     * @param addedFiles
+     * @param deletedFiles
+     * @param appendedFiles
+     * @return
+     * @throws AlgebricksException
+     */
+    public static boolean isDatasetUptodate(Dataset dataset, List<ExternalFile> metadataFiles,
+            List<ExternalFile> addedFiles, List<ExternalFile> deletedFiles, List<ExternalFile> appendedFiles)
+            throws AlgebricksException {
+        boolean uptodate = true;
+        int newFileNumber = metadataFiles.get(metadataFiles.size() - 1).getFileNumber() + 1;
+
+        List<ExternalFile> fileSystemFiles = getSnapshotFromExternalFileSystem(dataset);
+
+        // Loop over file system files < taking care of added files >
+        for (ExternalFile fileSystemFile : fileSystemFiles) {
+            boolean fileFound = false;
+            Iterator<ExternalFile> mdFilesIterator = metadataFiles.iterator();
+            while (mdFilesIterator.hasNext()) {
+                ExternalFile metadataFile = mdFilesIterator.next();
+                if (!fileSystemFile.getFileName().equals(metadataFile.getFileName())) {
+                    continue;
+                }
+                // Same file name
+                if (fileSystemFile.getLastModefiedTime().equals(metadataFile.getLastModefiedTime())) {
+                    // Same timestamp
+                    if (fileSystemFile.getSize() == metadataFile.getSize()) {
+                        // Same size -> no op
+                        mdFilesIterator.remove();
+                        fileFound = true;
+                    } else {
+                        // Different size -> append op
+                        metadataFile.setPendingOp(ExternalFilePendingOp.APPEND_OP);
+                        fileSystemFile.setPendingOp(ExternalFilePendingOp.APPEND_OP);
+                        appendedFiles.add(fileSystemFile);
+                        fileFound = true;
+                        uptodate = false;
+                    }
+                } else {
+                    // Same file name, Different file mod date -> delete and add
+                    metadataFile.setPendingOp(ExternalFilePendingOp.DROP_OP);
+                    deletedFiles.add(new ExternalFile(metadataFile.getDataverseName(), metadataFile.getDatasetName(),
+                            0, metadataFile.getFileName(), metadataFile.getLastModefiedTime(), metadataFile.getSize(),
+                            ExternalFilePendingOp.DROP_OP));
+                    fileSystemFile.setPendingOp(ExternalFilePendingOp.ADD_OP);
+                    fileSystemFile.setFileNumber(newFileNumber);
+                    addedFiles.add(fileSystemFile);
+                    newFileNumber++;
+                    fileFound = true;
+                    uptodate = false;
+                }
+                if (fileFound) {
+                    break;
+                }
+            }
+            if (!fileFound) {
+                // File not stored previously in metadata -> pending add op
+                fileSystemFile.setPendingOp(ExternalFilePendingOp.ADD_OP);
+                fileSystemFile.setFileNumber(newFileNumber);
+                addedFiles.add(fileSystemFile);
+                newFileNumber++;
+                uptodate = false;
+            }
+        }
+
+        // Done with files from external file system -> metadata files now contain both deleted files and appended ones
+        // first, correct number assignment to deleted and updated files
+        for (ExternalFile deletedFile : deletedFiles) {
+            deletedFile.setFileNumber(newFileNumber);
+            newFileNumber++;
+        }
+        for (ExternalFile appendedFile : appendedFiles) {
+            appendedFile.setFileNumber(newFileNumber);
+            newFileNumber++;
+        }
+
+        // include the remaining deleted files
+        Iterator<ExternalFile> mdFilesIterator = metadataFiles.iterator();
+        while (mdFilesIterator.hasNext()) {
+            ExternalFile metadataFile = mdFilesIterator.next();
+            if (metadataFile.getPendingOp() == ExternalFilePendingOp.NO_OP) {
+                metadataFile.setPendingOp(ExternalFilePendingOp.DROP_OP);
+                deletedFiles.add(new ExternalFile(metadataFile.getDataverseName(), metadataFile.getDatasetName(),
+                        newFileNumber, metadataFile.getFileName(), metadataFile.getLastModefiedTime(),
+                        metadataFile.getSize(), metadataFile.getPendingOp()));
+                newFileNumber++;
+                uptodate = false;
+            }
+        }
+        return uptodate;
+    }
+
+    public static Dataset createTransactionDataset(Dataset dataset) {
+        ExternalDatasetDetails originalDsd = (ExternalDatasetDetails) dataset.getDatasetDetails();
+        ExternalDatasetDetails dsd = new ExternalDatasetDetails(originalDsd.getAdapter(), originalDsd.getProperties(),
+                originalDsd.getTimestamp(), TransactionState.BEGIN);
+        return new Dataset(dataset.getDataverseName(), dataset.getDatasetName(), dataset.getItemTypeDataverseName(),
+                dataset.getItemTypeName(), dataset.getNodeGroupName(), dataset.getCompactionPolicy(),
+                dataset.getCompactionPolicyProperties(), dsd, dataset.getHints(), DatasetType.EXTERNAL,
+                dataset.getDatasetId(), dataset.getPendingOp());
+    }
+
+    public static JobSpecification buildDropFilesIndexJobSpec(MetadataProvider metadataProvider, Dataset dataset)
+            throws AlgebricksException {
+        String indexName = IndexingConstants.getFilesIndexName(dataset.getDatasetName());
+        JobSpecification spec = RuntimeUtils.createJobSpecification();
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
+                metadataProvider.splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(),
+                        dataset.getDatasetName(), indexName, true);
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+                DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
+        String fileIndexName = BTreeDataflowHelperFactoryProvider.externalFileIndexName(dataset);
+        Index fileIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
+                dataset.getDataverseName(), dataset.getDatasetName(), fileIndexName);
+        IIndexDataflowHelperFactory dataflowHelperFactory = dataset.getIndexDataflowHelperFactory(metadataProvider,
+                fileIndex, null, null, compactionInfo.first, compactionInfo.second);
+        IndexDropOperatorDescriptor btreeDrop = new IndexDropOperatorDescriptor(spec,
+                metadataProvider.getStorageComponentProvider().getStorageManager(),
+                metadataProvider.getStorageComponentProvider().getIndexLifecycleManagerProvider(),
+                splitsAndConstraint.first, dataflowHelperFactory,
+                metadataProvider.getStorageComponentProvider().getMetadataPageManagerFactory());
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeDrop,
+                splitsAndConstraint.second);
+        spec.addRoot(btreeDrop);
+
+        return spec;
+    }
+
+    public static JobSpecification buildFilesIndexUpdateOp(Dataset ds, List<ExternalFile> metadataFiles,
+            List<ExternalFile> addedFiles, List<ExternalFile> appendedFiles, MetadataProvider metadataProvider)
+            throws AlgebricksException {
+        ArrayList<ExternalFile> files = new ArrayList<>();
+        for (ExternalFile file : metadataFiles) {
+            if (file.getPendingOp() == ExternalFilePendingOp.DROP_OP) {
+                files.add(file);
+            } else if (file.getPendingOp() == ExternalFilePendingOp.APPEND_OP) {
+                for (ExternalFile appendedFile : appendedFiles) {
+                    if (appendedFile.getFileName().equals(file.getFileName())) {
+                        files.add(new ExternalFile(file.getDataverseName(), file.getDatasetName(),
+                                file.getFileNumber(), file.getFileName(), file.getLastModefiedTime(),
+                                appendedFile.getSize(), ExternalFilePendingOp.NO_OP));
+                    }
+                }
+            }
+        }
+        for (ExternalFile file : addedFiles) {
+            files.add(file);
+        }
+        Collections.sort(files);
+        return buildFilesIndexReplicationJobSpec(ds, files, metadataProvider, false);
+    }
+
+    public static JobSpecification buildIndexUpdateOp(Dataset ds, Index index, List<ExternalFile> metadataFiles,
+            List<ExternalFile> addedFiles, List<ExternalFile> appendedFiles, MetadataProvider metadataProvider)
+            throws AlgebricksException {
+        // Create files list
+        ArrayList<ExternalFile> files = new ArrayList<>();
+
+        for (ExternalFile metadataFile : metadataFiles) {
+            if (metadataFile.getPendingOp() != ExternalFilePendingOp.APPEND_OP) {
+                files.add(metadataFile);
+            } else {
+                metadataFile.setPendingOp(ExternalFilePendingOp.NO_OP);
+                files.add(metadataFile);
+            }
+        }
+        // add new files
+        for (ExternalFile file : addedFiles) {
+            files.add(file);
+        }
+        // add appended files
+        for (ExternalFile file : appendedFiles) {
+            files.add(file);
+        }
+        return IndexUtil.buildSecondaryIndexLoadingJobSpec(ds, index, null, null, null, null, metadataProvider, files);
+    }
+
+    public static JobSpecification buildCommitJob(Dataset ds, List<Index> indexes, MetadataProvider metadataProvider)
+            throws AlgebricksException {
+        JobSpecification spec = RuntimeUtils.createJobSpecification();
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+                DatasetUtil.getMergePolicyFactory(ds, metadataProvider.getMetadataTxnContext());
+        boolean temp = ds.getDatasetDetails().isTemp();
+        ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
+        Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> filesIndexSplitsAndConstraint =
+                metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), ds.getDatasetName(),
+                        IndexingConstants.getFilesIndexName(ds.getDatasetName()), temp);
+        IFileSplitProvider filesIndexSplitProvider = filesIndexSplitsAndConstraint.first;
+        String fileIndexName = BTreeDataflowHelperFactoryProvider.externalFileIndexName(ds);
+        Index fileIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
+                ds.getDataverseName(), ds.getDatasetName(), fileIndexName);
+        IIndexDataflowHelperFactory filesIndexDataflowHelperFactory = ds.getIndexDataflowHelperFactory(
+                metadataProvider, fileIndex, null, null, mergePolicyFactory, mergePolicyFactoryProperties);
+        IndexInfoOperatorDescriptor filesIndexInfo = new IndexInfoOperatorDescriptor(filesIndexSplitProvider,
+                RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER);
+
+        ArrayList<IIndexDataflowHelperFactory> treeDataflowHelperFactories = new ArrayList<>();
+        ArrayList<IndexInfoOperatorDescriptor> treeInfos = new ArrayList<>();
+        for (Index index : indexes) {
+            if (isValidIndexName(index.getDatasetName(), index.getIndexName())) {
+                Pair<IFileSplitProvider, AlgebricksPartitionConstraint> indexSplitsAndConstraint =
+                        metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), ds.getDatasetName(),
+                                index.getIndexName(), temp);
+                IIndexDataflowHelperFactory indexDataflowHelperFactory = ds.getIndexDataflowHelperFactory(
+                        metadataProvider, index, null, null, mergePolicyFactory, mergePolicyFactoryProperties);
+                treeDataflowHelperFactories.add(indexDataflowHelperFactory);
+                treeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first,
+                        RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER));
+            }
+        }
+
+        ExternalDatasetIndexesCommitOperatorDescriptor op = new ExternalDatasetIndexesCommitOperatorDescriptor(spec,
+                filesIndexDataflowHelperFactory, filesIndexInfo, treeDataflowHelperFactories, treeInfos);
+
+        spec.addRoot(op);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, op,
+                filesIndexSplitsAndConstraint.second);
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    public static JobSpecification buildAbortOp(Dataset ds, List<Index> indexes, MetadataProvider metadataProvider)
+            throws AlgebricksException {
+        JobSpecification spec = RuntimeUtils.createJobSpecification();
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+                DatasetUtil.getMergePolicyFactory(ds, metadataProvider.getMetadataTxnContext());
+        ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
+        Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
+
+        boolean temp = ds.getDatasetDetails().isTemp();
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> filesIndexSplitsAndConstraint =
+                metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), ds.getDatasetName(),
+                        IndexingConstants.getFilesIndexName(ds.getDatasetName()), temp);
+        IFileSplitProvider filesIndexSplitProvider = filesIndexSplitsAndConstraint.first;
+        String fileIndexName = BTreeDataflowHelperFactoryProvider.externalFileIndexName(ds);
+        Index fileIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
+                ds.getDataverseName(), ds.getDatasetName(), fileIndexName);
+        IIndexDataflowHelperFactory filesIndexDataflowHelperFactory = ds.getIndexDataflowHelperFactory(
+                metadataProvider, fileIndex, null, null, mergePolicyFactory, mergePolicyFactoryProperties);
+        IndexInfoOperatorDescriptor filesIndexInfo = new IndexInfoOperatorDescriptor(filesIndexSplitProvider,
+                RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER);
+
+        ArrayList<IIndexDataflowHelperFactory> treeDataflowHelperFactories = new ArrayList<>();
+        ArrayList<IndexInfoOperatorDescriptor> treeInfos = new ArrayList<>();
+        for (Index index : indexes) {
+            if (isValidIndexName(index.getDatasetName(), index.getIndexName())) {
+                Pair<IFileSplitProvider, AlgebricksPartitionConstraint> indexSplitsAndConstraint =
+                        metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), ds.getDatasetName(),
+                                index.getIndexName(), temp);
+                IIndexDataflowHelperFactory indexDataflowHelperFactory = ds.getIndexDataflowHelperFactory(
+                        metadataProvider, index, null, null, mergePolicyFactory, mergePolicyFactoryProperties);
+                treeDataflowHelperFactories.add(indexDataflowHelperFactory);
+                treeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first,
+                        RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER));
+            }
+        }
+
+        ExternalDatasetIndexesAbortOperatorDescriptor op = new ExternalDatasetIndexesAbortOperatorDescriptor(spec,
+                filesIndexDataflowHelperFactory, filesIndexInfo, treeDataflowHelperFactories, treeInfos);
+
+        spec.addRoot(op);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, op,
+                filesIndexSplitsAndConstraint.second);
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+
+    }
+
+    public static JobSpecification buildRecoverOp(Dataset ds, List<Index> indexes, MetadataProvider metadataProvider)
+            throws AlgebricksException {
+        JobSpecification spec = RuntimeUtils.createJobSpecification();
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+                DatasetUtil.getMergePolicyFactory(ds, metadataProvider.getMetadataTxnContext());
+        ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
+        Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
+        boolean temp = ds.getDatasetDetails().isTemp();
+
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> filesIndexSplitsAndConstraint =
+                metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), ds.getDatasetName(),
+                        IndexingConstants.getFilesIndexName(ds.getDatasetName()), temp);
+        IFileSplitProvider filesIndexSplitProvider = filesIndexSplitsAndConstraint.first;
+        String fileIndexName = BTreeDataflowHelperFactoryProvider.externalFileIndexName(ds);
+        Index fileIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
+                ds.getDataverseName(), ds.getDatasetName(), fileIndexName);
+        IIndexDataflowHelperFactory filesIndexDataflowHelperFactory = ds.getIndexDataflowHelperFactory(
+                metadataProvider, fileIndex, null, null, mergePolicyFactory, mergePolicyFactoryProperties);
+        IndexInfoOperatorDescriptor filesIndexInfo = new IndexInfoOperatorDescriptor(filesIndexSplitProvider,
+                RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER);
+
+        ArrayList<IIndexDataflowHelperFactory> treeDataflowHelperFactories = new ArrayList<>();
+        ArrayList<IndexInfoOperatorDescriptor> treeInfos = new ArrayList<>();
+        for (Index index : indexes) {
+            if (isValidIndexName(index.getDatasetName(), index.getIndexName())) {
+                Pair<IFileSplitProvider, AlgebricksPartitionConstraint> indexSplitsAndConstraint =
+                        metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), ds.getDatasetName(),
+                                index.getIndexName(), temp);
+                IIndexDataflowHelperFactory indexDataflowHelperFactory = ds.getIndexDataflowHelperFactory(
+                        metadataProvider, index, null, null, mergePolicyFactory, mergePolicyFactoryProperties);
+                treeDataflowHelperFactories.add(indexDataflowHelperFactory);
+                treeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first,
+                        RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER));
+            }
+        }
+
+        ExternalDatasetIndexesRecoverOperatorDescriptor op = new ExternalDatasetIndexesRecoverOperatorDescriptor(spec,
+                filesIndexDataflowHelperFactory, filesIndexInfo, treeDataflowHelperFactories, treeInfos);
+
+        spec.addRoot(op);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, op,
+                filesIndexSplitsAndConstraint.second);
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    public static JobSpecification compactFilesIndexJobSpec(Dataset dataset, MetadataProvider metadataProvider,
+            IStorageComponentProvider storageComponentProvider) throws AlgebricksException {
+        JobSpecification spec = RuntimeUtils.createJobSpecification();
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+                DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
+        ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
+        Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint =
+                metadataProvider.splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(),
+                        dataset.getDatasetName(), IndexingConstants.getFilesIndexName(dataset.getDatasetName()), true);
+        IFileSplitProvider secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
+
+        String fileIndexName = BTreeDataflowHelperFactoryProvider.externalFileIndexName(dataset);
+        Index fileIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
+                dataset.getDataverseName(), dataset.getDatasetName(), fileIndexName);
+        IIndexDataflowHelperFactory dataflowHelperFactory = dataset.getIndexDataflowHelperFactory(metadataProvider,
+                fileIndex, null, null, mergePolicyFactory, mergePolicyFactoryProperties);
+        FilesIndexDescription filesIndexDescription = new FilesIndexDescription();
+        LSMTreeIndexCompactOperatorDescriptor compactOp = new LSMTreeIndexCompactOperatorDescriptor(spec,
+                RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER,
+                secondaryFileSplitProvider, filesIndexDescription.EXTERNAL_FILE_INDEX_TYPE_TRAITS,
+                FilesIndexDescription.FILES_INDEX_COMP_FACTORIES, new int[] { 0 }, dataflowHelperFactory,
+                NoOpOperationCallbackFactory.INSTANCE, storageComponentProvider.getMetadataPageManagerFactory());
+        spec.addRoot(compactOp);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp,
+                secondarySplitsAndConstraint.second);
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    public static boolean isFileIndex(Index index) {
+        return index.getIndexName().equals(IndexingConstants.getFilesIndexName(index.getDatasetName()));
+    }
+}


Mime
View raw message