asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From buyin...@apache.org
Subject [25/28] asterixdb git commit: Introduce IStorageComponentProvider
Date Thu, 02 Feb 2017 18:24:34 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CompilerExtensionManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CompilerExtensionManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CompilerExtensionManager.java
deleted file mode 100644
index 95ce100..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CompilerExtensionManager.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.cc;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.algebra.base.ILangExtension;
-import org.apache.asterix.algebra.base.ILangExtension.Language;
-import org.apache.asterix.algebra.extension.IAlgebraExtensionManager;
-import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
-import org.apache.asterix.common.api.ExtensionId;
-import org.apache.asterix.common.api.IExtension;
-import org.apache.asterix.common.config.AsterixExtension;
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.compiler.provider.AqlCompilationProvider;
-import org.apache.asterix.compiler.provider.ILangCompilationProvider;
-import org.apache.asterix.compiler.provider.SqlppCompilationProvider;
-import org.apache.asterix.translator.IStatementExecutorFactory;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-/**
- * AsterixDB's implementation of {@code IAlgebraExtensionManager} which takes care of
- * initializing extensions for App and Compilation purposes
- */
-public class CompilerExtensionManager implements IAlgebraExtensionManager {
-
-    private final Map<ExtensionId, IExtension> extensions = new HashMap<>();
-
-    private final IStatementExecutorExtension statementExecutorExtension;
-    private final ILangCompilationProvider aqlCompilationProvider;
-    private final ILangCompilationProvider sqlppCompilationProvider;
-    private final DefaultStatementExecutorFactory defaultQueryTranslatorFactory;
-
-    /**
-     * Initialize {@code CompilerExtensionManager} from configuration
-     *
-     * @param list
-     * @throws InstantiationException
-     * @throws IllegalAccessException
-     * @throws ClassNotFoundException
-     * @throws HyracksDataException
-     */
-    public CompilerExtensionManager(List<AsterixExtension> list)
-            throws InstantiationException, IllegalAccessException, ClassNotFoundException, HyracksDataException {
-        Pair<ExtensionId, ILangCompilationProvider> aqlcp = null;
-        Pair<ExtensionId, ILangCompilationProvider> sqlppcp = null;
-        IStatementExecutorExtension see = null;
-        defaultQueryTranslatorFactory = new DefaultStatementExecutorFactory();
-
-        if (list != null) {
-            for (AsterixExtension extensionConf : list) {
-                IExtension extension = (IExtension) Class.forName(extensionConf.getClassName()).newInstance();
-                extension.configure(extensionConf.getArgs());
-                if (extensions.containsKey(extension.getId())) {
-                    throw new RuntimeDataException(ErrorCode.EXTENSION_ID_CONFLICT, extension.getId());
-                }
-                extensions.put(extension.getId(), extension);
-                switch (extension.getExtensionKind()) {
-                    case STATEMENT_EXECUTOR:
-                        see = extendStatementExecutor(see, (IStatementExecutorExtension) extension);
-                        break;
-                    case LANG:
-                        ILangExtension le = (ILangExtension) extension;
-                        aqlcp = extendLangCompilationProvider(Language.AQL, aqlcp, le);
-                        sqlppcp = extendLangCompilationProvider(Language.SQLPP, sqlppcp, le);
-                        break;
-                    default:
-                        break;
-                }
-            }
-        }
-        this.statementExecutorExtension = see;
-        this.aqlCompilationProvider = aqlcp == null ? new AqlCompilationProvider() : aqlcp.second;
-        this.sqlppCompilationProvider = sqlppcp == null ? new SqlppCompilationProvider() : sqlppcp.second;
-    }
-
-    private Pair<ExtensionId, ILangCompilationProvider> extendLangCompilationProvider(Language lang,
-            Pair<ExtensionId, ILangCompilationProvider> cp, ILangExtension le) throws HyracksDataException {
-        if (cp != null && le.getLangCompilationProvider(lang) != null) {
-            throw new RuntimeDataException(ErrorCode.EXTENSION_COMPONENT_CONFLICT, le.getId(), cp.first,
-                    lang.toString());
-        }
-        return (le.getLangCompilationProvider(lang) != null)
-                ? new Pair<>(le.getId(), le.getLangCompilationProvider(lang)) : cp;
-    }
-
-    private IStatementExecutorExtension extendStatementExecutor(IStatementExecutorExtension qte,
-            IStatementExecutorExtension extension) throws HyracksDataException {
-        if (qte != null) {
-            throw new RuntimeDataException(ErrorCode.EXTENSION_COMPONENT_CONFLICT, qte.getId(), extension.getId(),
-                    IStatementExecutorFactory.class.getSimpleName());
-        }
-        return extension;
-    }
-
-    public IStatementExecutorFactory getQueryTranslatorFactory() {
-        return statementExecutorExtension == null ? defaultQueryTranslatorFactory
-                : statementExecutorExtension.getQueryTranslatorFactory();
-    }
-
-    public ILangCompilationProvider getAqlCompilationProvider() {
-        return aqlCompilationProvider;
-    }
-
-    public ILangCompilationProvider getSqlppCompilationProvider() {
-        return sqlppCompilationProvider;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/IStatementExecutorExtension.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/IStatementExecutorExtension.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/IStatementExecutorExtension.java
index a4b2345..f7b6842 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/IStatementExecutorExtension.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/IStatementExecutorExtension.java
@@ -25,6 +25,7 @@ import org.apache.asterix.translator.IStatementExecutorFactory;
  * An interface for extensions of {@code IStatementExecutor}
  */
 public interface IStatementExecutorExtension extends IExtension {
+
     @Override
     default ExtensionKind getExtensionKind() {
         return ExtensionKind.STATEMENT_EXECUTOR;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/ResourceIdManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/ResourceIdManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/ResourceIdManager.java
index 7b01169..f43092b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/ResourceIdManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/ResourceIdManager.java
@@ -23,7 +23,7 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.asterix.common.transactions.IResourceIdManager;
-import org.apache.asterix.runtime.util.ClusterStateManager;
+import org.apache.asterix.runtime.utils.ClusterStateManager;
 
 public class ResourceIdManager implements IResourceIdManager {
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java
deleted file mode 100644
index a2518ec..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java
+++ /dev/null
@@ -1,771 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.external;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState;
-import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
-import org.apache.asterix.common.config.DatasetConfig.IndexType;
-import org.apache.asterix.common.config.IPropertiesProvider;
-import org.apache.asterix.common.config.StorageProperties;
-import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
-import org.apache.asterix.common.dataflow.LSMIndexUtil;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
-import org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory;
-import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
-import org.apache.asterix.common.transactions.IResourceFactory;
-import org.apache.asterix.dataflow.data.nontagged.valueproviders.PrimitiveValueProviderFactory;
-import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.external.indexing.FilesIndexDescription;
-import org.apache.asterix.external.indexing.IndexingConstants;
-import org.apache.asterix.external.operators.ExternalDataScanOperatorDescriptor;
-import org.apache.asterix.external.operators.ExternalDatasetIndexesAbortOperatorDescriptor;
-import org.apache.asterix.external.operators.ExternalDatasetIndexesCommitOperatorDescriptor;
-import org.apache.asterix.external.operators.ExternalDatasetIndexesRecoverOperatorDescriptor;
-import org.apache.asterix.external.operators.ExternalFilesIndexOperatorDescriptor;
-import org.apache.asterix.external.operators.IndexInfoOperatorDescriptor;
-import org.apache.asterix.external.provider.AdapterFactoryProvider;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.file.IndexOperations;
-import org.apache.asterix.file.JobSpecificationUtils;
-import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
-import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
-import org.apache.asterix.formats.nontagged.TypeTraitProvider;
-import org.apache.asterix.metadata.MetadataException;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.declared.MetadataProvider;
-import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
-import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.utils.DatasetUtils;
-import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.BuiltinType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.NonTaggedFormatUtil;
-import org.apache.asterix.runtime.util.AppContextInfo;
-import org.apache.asterix.runtime.util.RuntimeComponentsProvider;
-import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
-import org.apache.asterix.transaction.management.resource.ExternalBTreeLocalResourceMetadataFactory;
-import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
-import org.apache.asterix.translator.CompiledStatements.CompiledCreateIndexStatement;
-import org.apache.asterix.translator.CompiledStatements.CompiledIndexDropStatement;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.ITypeTraits;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
-import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
-import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor;
-import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
-import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeDataflowHelperFactory;
-import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeWithBuddyDataflowHelperFactory;
-import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
-import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor;
-import org.apache.hyracks.storage.am.lsm.rtree.dataflow.ExternalRTreeDataflowHelperFactory;
-import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType;
-import org.apache.hyracks.storage.common.file.LocalResource;
-
-public class ExternalIndexingOperations {
-
-    public static final List<List<String>> FILE_INDEX_FIELD_NAMES = Collections
-            .singletonList(Collections.singletonList(""));
-    public static final List<IAType> FILE_INDEX_FIELD_TYPES = Collections.singletonList(BuiltinType.ASTRING);
-
-    private ExternalIndexingOperations() {
-    }
-
-    public static boolean isIndexible(ExternalDatasetDetails ds) {
-        String adapter = ds.getAdapter();
-        if (adapter.equalsIgnoreCase(ExternalDataConstants.ALIAS_HDFS_ADAPTER)) {
-            return true;
-        }
-        return false;
-    }
-
-    public static boolean isRefereshActive(ExternalDatasetDetails ds) {
-        return ds.getState() != ExternalDatasetTransactionState.COMMIT;
-    }
-
-    public static boolean isValidIndexName(String datasetName, String indexName) {
-        return (!datasetName.concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX).equals(indexName));
-    }
-
-    public static String getFilesIndexName(String datasetName) {
-        return datasetName.concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX);
-    }
-
-    public static int getRIDSize(Dataset dataset) {
-        ExternalDatasetDetails dsd = ((ExternalDatasetDetails) dataset.getDatasetDetails());
-        return IndexingConstants.getRIDSize(dsd.getProperties().get(IndexingConstants.KEY_INPUT_FORMAT));
-    }
-
-    public static IBinaryComparatorFactory[] getComparatorFactories(Dataset dataset) {
-        ExternalDatasetDetails dsd = ((ExternalDatasetDetails) dataset.getDatasetDetails());
-        return IndexingConstants.getComparatorFactories(dsd.getProperties().get(IndexingConstants.KEY_INPUT_FORMAT));
-    }
-
-    public static IBinaryComparatorFactory[] getBuddyBtreeComparatorFactories() {
-        return IndexingConstants.getBuddyBtreeComparatorFactories();
-    }
-
-    public static ArrayList<ExternalFile> getSnapshotFromExternalFileSystem(Dataset dataset)
-            throws AlgebricksException {
-        ArrayList<ExternalFile> files = new ArrayList<>();
-        ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
-        try {
-            // Create the file system object
-            FileSystem fs = getFileSystemObject(datasetDetails.getProperties());
-            // Get paths of dataset
-            String path = datasetDetails.getProperties().get(ExternalDataConstants.KEY_PATH);
-            String[] paths = path.split(",");
-
-            // Add fileStatuses to files
-            for (String aPath : paths) {
-                FileStatus[] fileStatuses = fs.listStatus(new Path(aPath));
-                for (int i = 0; i < fileStatuses.length; i++) {
-                    int nextFileNumber = files.size();
-                    if (fileStatuses[i].isDirectory()) {
-                        listSubFiles(dataset, fs, fileStatuses[i], files);
-                    } else {
-                        files.add(new ExternalFile(dataset.getDataverseName(), dataset.getDatasetName(), nextFileNumber,
-                                fileStatuses[i].getPath().toUri().getPath(),
-                                new Date(fileStatuses[i].getModificationTime()), fileStatuses[i].getLen(),
-                                ExternalFilePendingOp.PENDING_NO_OP));
-                    }
-                }
-            }
-            // Close file system
-            fs.close();
-            if (files.size() == 0) {
-                throw new AlgebricksException("File Snapshot retrieved from external file system is empty");
-            }
-            return files;
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw new AlgebricksException("Unable to get list of HDFS files " + e);
-        }
-    }
-
-    /* list all files under the directory
-     * src is expected to be a folder
-     */
-    private static void listSubFiles(Dataset dataset, FileSystem srcFs, FileStatus src, ArrayList<ExternalFile> files)
-            throws IOException {
-        Path path = src.getPath();
-        FileStatus[] fileStatuses = srcFs.listStatus(path);
-        for (int i = 0; i < fileStatuses.length; i++) {
-            int nextFileNumber = files.size();
-            if (fileStatuses[i].isDirectory()) {
-                listSubFiles(dataset, srcFs, fileStatuses[i], files);
-            } else {
-                files.add(new ExternalFile(dataset.getDataverseName(), dataset.getDatasetName(), nextFileNumber,
-                        fileStatuses[i].getPath().toUri().getPath(), new Date(fileStatuses[i].getModificationTime()),
-                        fileStatuses[i].getLen(), ExternalFilePendingOp.PENDING_NO_OP));
-            }
-        }
-    }
-
-    public static FileSystem getFileSystemObject(Map<String, String> map) throws IOException {
-        Configuration conf = new Configuration();
-        conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_URI, map.get(ExternalDataConstants.KEY_HDFS_URL).trim());
-        conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_CLASS, DistributedFileSystem.class.getName());
-        return FileSystem.get(conf);
-    }
-
-    public static JobSpecification buildFilesIndexReplicationJobSpec(Dataset dataset,
-            ArrayList<ExternalFile> externalFilesSnapshot, MetadataProvider metadataProvider, boolean createIndex)
-            throws MetadataException, AlgebricksException {
-        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-        IPropertiesProvider asterixPropertiesProvider = AppContextInfo.INSTANCE;
-        StorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties();
-        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
-                metadataProvider.getMetadataTxnContext());
-        ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
-        Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
-                .splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(), dataset.getDatasetName(),
-                        getFilesIndexName(dataset.getDatasetName()), true);
-        IFileSplitProvider secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
-        FilesIndexDescription filesIndexDescription = new FilesIndexDescription();
-        IResourceFactory localResourceMetadata = new ExternalBTreeLocalResourceMetadataFactory(
-                filesIndexDescription.EXTERNAL_FILE_INDEX_TYPE_TRAITS, filesIndexDescription.FILES_INDEX_COMP_FACTORIES,
-                new int[] { 0 }, false, dataset.getDatasetId(), mergePolicyFactory, mergePolicyFactoryProperties);
-        PersistentLocalResourceFactoryProvider localResourceFactoryProvider =
-                new PersistentLocalResourceFactoryProvider(
-                        localResourceMetadata, LocalResource.ExternalBTreeResource);
-        ExternalBTreeDataflowHelperFactory indexDataflowHelperFactory = new ExternalBTreeDataflowHelperFactory(
-                mergePolicyFactory, mergePolicyFactoryProperties,
-                new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                RuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                storageProperties.getBloomFilterFalsePositiveRate(),
-                ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(dataset), true);
-        ExternalFilesIndexOperatorDescriptor externalFilesOp = new ExternalFilesIndexOperatorDescriptor(spec,
-                RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER,
-                secondaryFileSplitProvider, indexDataflowHelperFactory, localResourceFactoryProvider,
-                externalFilesSnapshot, createIndex, LSMIndexUtil.getMetadataPageManagerFactory());
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, externalFilesOp,
-                secondarySplitsAndConstraint.second);
-        spec.addRoot(externalFilesOp);
-        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
-        return spec;
-    }
-
-    /**
-     * This method create an indexing operator that index records in HDFS
-     *
-     * @param jobSpec
-     * @param itemType
-     * @param dataset
-     * @param files
-     * @param indexerDesc
-     * @return
-     * @throws AlgebricksException
-     * @throws HyracksDataException
-     * @throws Exception
-     */
-    private static Pair<ExternalDataScanOperatorDescriptor, AlgebricksPartitionConstraint>
-            getExternalDataIndexingOperator(
-                    MetadataProvider metadataProvider, JobSpecification jobSpec, IAType itemType, Dataset dataset,
-                    List<ExternalFile> files, RecordDescriptor indexerDesc)
-                    throws HyracksDataException, AlgebricksException {
-        ExternalDatasetDetails externalDatasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
-        Map<String, String> configuration = externalDatasetDetails.getProperties();
-        IAdapterFactory adapterFactory = AdapterFactoryProvider.getIndexingAdapterFactory(
-                metadataProvider.getLibraryManager(), externalDatasetDetails.getAdapter(), configuration,
-                (ARecordType) itemType, files, true, null);
-        return new Pair<>(new ExternalDataScanOperatorDescriptor(jobSpec, indexerDesc, adapterFactory),
-                adapterFactory.getPartitionConstraint());
-    }
-
-    public static Pair<ExternalDataScanOperatorDescriptor, AlgebricksPartitionConstraint> createExternalIndexingOp(
-            JobSpecification spec, MetadataProvider metadataProvider, Dataset dataset, ARecordType itemType,
-            RecordDescriptor indexerDesc, List<ExternalFile> files) throws HyracksDataException, AlgebricksException {
-        if (files == null) {
-            files = MetadataManager.INSTANCE.getDatasetExternalFiles(metadataProvider.getMetadataTxnContext(), dataset);
-        }
-        return getExternalDataIndexingOperator(metadataProvider, spec, itemType, dataset, files, indexerDesc);
-    }
-
-    /**
-     * At the end of this method, we expect to have 4 sets as follows:
-     * metadataFiles should contain only the files that are appended in their original state
-     * addedFiles should contain new files that has number assigned starting after the max original file number
-     * deletedFiles should contain files that are no longer there in the file system
-     * appendedFiles should have the new file information of existing files
-     * The method should return false in case of zero delta
-     *
-     * @param dataset
-     * @param metadataFiles
-     * @param addedFiles
-     * @param deletedFiles
-     * @param appendedFiles
-     * @return
-     * @throws MetadataException
-     * @throws AlgebricksException
-     */
-    public static boolean isDatasetUptodate(Dataset dataset, List<ExternalFile> metadataFiles,
-            List<ExternalFile> addedFiles, List<ExternalFile> deletedFiles, List<ExternalFile> appendedFiles)
-            throws MetadataException, AlgebricksException {
-        boolean uptodate = true;
-        int newFileNumber = metadataFiles.get(metadataFiles.size() - 1).getFileNumber() + 1;
-
-        ArrayList<ExternalFile> fileSystemFiles = getSnapshotFromExternalFileSystem(dataset);
-
-        // Loop over file system files < taking care of added files >
-        for (ExternalFile fileSystemFile : fileSystemFiles) {
-            boolean fileFound = false;
-            Iterator<ExternalFile> mdFilesIterator = metadataFiles.iterator();
-            while (mdFilesIterator.hasNext()) {
-                ExternalFile metadataFile = mdFilesIterator.next();
-                if (fileSystemFile.getFileName().equals(metadataFile.getFileName())) {
-                    // Same file name
-                    if (fileSystemFile.getLastModefiedTime().equals(metadataFile.getLastModefiedTime())) {
-                        // Same timestamp
-                        if (fileSystemFile.getSize() == metadataFile.getSize()) {
-                            // Same size -> no op
-                            mdFilesIterator.remove();
-                            fileFound = true;
-                        } else {
-                            // Different size -> append op
-                            metadataFile.setPendingOp(ExternalFilePendingOp.PENDING_APPEND_OP);
-                            fileSystemFile.setPendingOp(ExternalFilePendingOp.PENDING_APPEND_OP);
-                            appendedFiles.add(fileSystemFile);
-                            fileFound = true;
-                            uptodate = false;
-                        }
-                    } else {
-                        // Same file name, Different file mod date -> delete and add
-                        metadataFile.setPendingOp(ExternalFilePendingOp.PENDING_DROP_OP);
-                        deletedFiles
-                                .add(new ExternalFile(metadataFile.getDataverseName(), metadataFile.getDatasetName(), 0,
-                                        metadataFile.getFileName(), metadataFile.getLastModefiedTime(),
-                                        metadataFile.getSize(), ExternalFilePendingOp.PENDING_DROP_OP));
-                        fileSystemFile.setPendingOp(ExternalFilePendingOp.PENDING_ADD_OP);
-                        fileSystemFile.setFileNumber(newFileNumber);
-                        addedFiles.add(fileSystemFile);
-                        newFileNumber++;
-                        fileFound = true;
-                        uptodate = false;
-                    }
-                }
-                if (fileFound) {
-                    break;
-                }
-            }
-            if (!fileFound) {
-                // File not stored previously in metadata -> pending add op
-                fileSystemFile.setPendingOp(ExternalFilePendingOp.PENDING_ADD_OP);
-                fileSystemFile.setFileNumber(newFileNumber);
-                addedFiles.add(fileSystemFile);
-                newFileNumber++;
-                uptodate = false;
-            }
-        }
-
-        // Done with files from external file system -> metadata files now contain both deleted files and appended ones
-        // first, correct number assignment to deleted and updated files
-        for (ExternalFile deletedFile : deletedFiles) {
-            deletedFile.setFileNumber(newFileNumber);
-            newFileNumber++;
-        }
-        for (ExternalFile appendedFile : appendedFiles) {
-            appendedFile.setFileNumber(newFileNumber);
-            newFileNumber++;
-        }
-
-        // include the remaining deleted files
-        Iterator<ExternalFile> mdFilesIterator = metadataFiles.iterator();
-        while (mdFilesIterator.hasNext()) {
-            ExternalFile metadataFile = mdFilesIterator.next();
-            if (metadataFile.getPendingOp() == ExternalFilePendingOp.PENDING_NO_OP) {
-                metadataFile.setPendingOp(ExternalFilePendingOp.PENDING_DROP_OP);
-                deletedFiles.add(new ExternalFile(metadataFile.getDataverseName(), metadataFile.getDatasetName(),
-                        newFileNumber, metadataFile.getFileName(), metadataFile.getLastModefiedTime(),
-                        metadataFile.getSize(), metadataFile.getPendingOp()));
-                newFileNumber++;
-                uptodate = false;
-            }
-        }
-        return uptodate;
-    }
-
-    public static Dataset createTransactionDataset(Dataset dataset) {
-        ExternalDatasetDetails originalDsd = (ExternalDatasetDetails) dataset.getDatasetDetails();
-        ExternalDatasetDetails dsd = new ExternalDatasetDetails(originalDsd.getAdapter(), originalDsd.getProperties(),
-                originalDsd.getTimestamp(), ExternalDatasetTransactionState.BEGIN);
-        Dataset transactionDatset = new Dataset(dataset.getDataverseName(), dataset.getDatasetName(),
-                dataset.getItemTypeDataverseName(), dataset.getItemTypeName(), dataset.getNodeGroupName(),
-                dataset.getCompactionPolicy(), dataset.getCompactionPolicyProperties(), dsd, dataset.getHints(),
-                DatasetType.EXTERNAL, dataset.getDatasetId(), dataset.getPendingOp());
-        return transactionDatset;
-    }
-
-    public static boolean isFileIndex(Index index) {
-        return (index.getIndexName().equals(getFilesIndexName(index.getDatasetName())));
-    }
-
-    public static JobSpecification buildDropFilesIndexJobSpec(CompiledIndexDropStatement indexDropStmt,
-            MetadataProvider metadataProvider, Dataset dataset) throws AlgebricksException, MetadataException {
-        String dataverseName = indexDropStmt.getDataverseName() == null ? metadataProvider.getDefaultDataverseName()
-                : indexDropStmt.getDataverseName();
-        String datasetName = indexDropStmt.getDatasetName();
-        String indexName = indexDropStmt.getIndexName();
-        boolean temp = dataset.getDatasetDetails().isTemp();
-        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider
-                .splitProviderAndPartitionConstraintsForFilesIndex(dataverseName, datasetName, indexName, true);
-        StorageProperties storageProperties = AppContextInfo.INSTANCE.getStorageProperties();
-        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
-                metadataProvider.getMetadataTxnContext());
-        IndexDropOperatorDescriptor btreeDrop = new IndexDropOperatorDescriptor(spec,
-                RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER,
-                splitsAndConstraint.first,
-                new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
-                        compactionInfo.first, compactionInfo.second,
-                        new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                        RuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                        storageProperties.getBloomFilterFalsePositiveRate(), false, null, null, null, null, !temp),
-                LSMIndexUtil.getMetadataPageManagerFactory());
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeDrop,
-                splitsAndConstraint.second);
-        spec.addRoot(btreeDrop);
-
-        return spec;
-    }
-
-    public static JobSpecification buildFilesIndexUpdateOp(Dataset ds, List<ExternalFile> metadataFiles,
-            List<ExternalFile> deletedFiles, List<ExternalFile> addedFiles, List<ExternalFile> appendedFiles,
-            MetadataProvider metadataProvider) throws MetadataException, AlgebricksException {
-        ArrayList<ExternalFile> files = new ArrayList<>();
-        for (ExternalFile file : metadataFiles) {
-            if (file.getPendingOp() == ExternalFilePendingOp.PENDING_DROP_OP) {
-                files.add(file);
-            } else if (file.getPendingOp() == ExternalFilePendingOp.PENDING_APPEND_OP) {
-                for (ExternalFile appendedFile : appendedFiles) {
-                    if (appendedFile.getFileName().equals(file.getFileName())) {
-                        files.add(new ExternalFile(file.getDataverseName(), file.getDatasetName(), file.getFileNumber(),
-                                file.getFileName(), file.getLastModefiedTime(), appendedFile.getSize(),
-                                ExternalFilePendingOp.PENDING_NO_OP));
-                    }
-                }
-            }
-        }
-        for (ExternalFile file : addedFiles) {
-            files.add(file);
-        }
-        Collections.sort(files);
-        return buildFilesIndexReplicationJobSpec(ds, files, metadataProvider, false);
-    }
-
-    public static JobSpecification buildIndexUpdateOp(Dataset ds, Index index, List<ExternalFile> metadataFiles,
-            List<ExternalFile> deletedFiles, List<ExternalFile> addedFiles, List<ExternalFile> appendedFiles,
-            MetadataProvider metadataProvider) throws AsterixException, AlgebricksException {
-        // Create files list
-        ArrayList<ExternalFile> files = new ArrayList<>();
-
-        for (ExternalFile metadataFile : metadataFiles) {
-            if (metadataFile.getPendingOp() != ExternalFilePendingOp.PENDING_APPEND_OP) {
-                files.add(metadataFile);
-            } else {
-                metadataFile.setPendingOp(ExternalFilePendingOp.PENDING_NO_OP);
-                files.add(metadataFile);
-            }
-        }
-        // add new files
-        for (ExternalFile file : addedFiles) {
-            files.add(file);
-        }
-        // add appended files
-        for (ExternalFile file : appendedFiles) {
-            files.add(file);
-        }
-
-        CompiledCreateIndexStatement ccis = new CompiledCreateIndexStatement(index.getIndexName(),
-                index.getDataverseName(), index.getDatasetName(), index.getKeyFieldNames(), index.getKeyFieldTypes(),
-                index.isEnforcingKeyFileds(), index.getGramLength(), index.getIndexType());
-        return IndexOperations.buildSecondaryIndexLoadingJobSpec(ccis, null, null, null, null, metadataProvider, files);
-    }
-
-    public static JobSpecification buildCommitJob(Dataset ds, List<Index> indexes, MetadataProvider metadataProvider)
-            throws AlgebricksException, AsterixException {
-        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-        IPropertiesProvider asterixPropertiesProvider = AppContextInfo.INSTANCE;
-        StorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties();
-        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(ds,
-                metadataProvider.getMetadataTxnContext());
-        boolean temp = ds.getDatasetDetails().isTemp();
-        ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
-        Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> filesIndexSplitsAndConstraint = metadataProvider
-                .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(),
-                        getFilesIndexName(ds.getDatasetName()), temp);
-        IFileSplitProvider filesIndexSplitProvider = filesIndexSplitsAndConstraint.first;
-        ExternalBTreeDataflowHelperFactory filesIndexDataflowHelperFactory = getFilesIndexDataflowHelperFactory(ds,
-                mergePolicyFactory, mergePolicyFactoryProperties, storageProperties, spec);
-        IndexInfoOperatorDescriptor filesIndexInfo = new IndexInfoOperatorDescriptor(filesIndexSplitProvider,
-                RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER);
-
-        ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory> btreeDataflowHelperFactories = new ArrayList<>();
-        ArrayList<IndexInfoOperatorDescriptor> btreeInfos = new ArrayList<>();
-        ArrayList<ExternalRTreeDataflowHelperFactory> rtreeDataflowHelperFactories = new ArrayList<>();
-        ArrayList<IndexInfoOperatorDescriptor> rtreeInfos = new ArrayList<>();
-
-        for (Index index : indexes) {
-            if (isValidIndexName(index.getDatasetName(), index.getIndexName())) {
-                Pair<IFileSplitProvider, AlgebricksPartitionConstraint> indexSplitsAndConstraint = metadataProvider
-                        .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(),
-                                index.getIndexName(), temp);
-                if (index.getIndexType() == IndexType.BTREE) {
-                    btreeDataflowHelperFactories.add(getBTreeDataflowHelperFactory(ds, index, mergePolicyFactory,
-                            mergePolicyFactoryProperties, storageProperties, spec));
-                    btreeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first,
-                            RuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            RuntimeComponentsProvider.RUNTIME_PROVIDER));
-                } else if (index.getIndexType() == IndexType.RTREE) {
-                    rtreeDataflowHelperFactories.add(getRTreeDataflowHelperFactory(ds, index, mergePolicyFactory,
-                            mergePolicyFactoryProperties, storageProperties, metadataProvider, spec));
-                    rtreeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first,
-                            RuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            RuntimeComponentsProvider.RUNTIME_PROVIDER));
-                }
-            }
-        }
-
-        ExternalDatasetIndexesCommitOperatorDescriptor op = new ExternalDatasetIndexesCommitOperatorDescriptor(spec,
-                filesIndexDataflowHelperFactory, filesIndexInfo, btreeDataflowHelperFactories, btreeInfos,
-                rtreeDataflowHelperFactories, rtreeInfos);
-
-        spec.addRoot(op);
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, op,
-                filesIndexSplitsAndConstraint.second);
-        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
-        return spec;
-    }
-
-    private static ExternalBTreeDataflowHelperFactory getFilesIndexDataflowHelperFactory(Dataset ds,
-            ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyFactoryProperties,
-            StorageProperties storageProperties, JobSpecification spec) {
-        return new ExternalBTreeDataflowHelperFactory(mergePolicyFactory, mergePolicyFactoryProperties,
-                new SecondaryIndexOperationTrackerProvider(ds.getDatasetId()),
-                RuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                storageProperties.getBloomFilterFalsePositiveRate(),
-                ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(ds), true);
-    }
-
-    private static ExternalBTreeWithBuddyDataflowHelperFactory getBTreeDataflowHelperFactory(Dataset ds, Index index,
-            ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyFactoryProperties,
-            StorageProperties storageProperties, JobSpecification spec) {
-        return new ExternalBTreeWithBuddyDataflowHelperFactory(mergePolicyFactory, mergePolicyFactoryProperties,
-                new SecondaryIndexOperationTrackerProvider(ds.getDatasetId()),
-                RuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE,
-                storageProperties.getBloomFilterFalsePositiveRate(), new int[] { index.getKeyFieldNames().size() },
-                ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(ds), true);
-    }
-
-    @SuppressWarnings("rawtypes")
-    private static ExternalRTreeDataflowHelperFactory getRTreeDataflowHelperFactory(Dataset ds, Index index,
-            ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyFactoryProperties,
-            StorageProperties storageProperties, MetadataProvider metadataProvider, JobSpecification spec)
-            throws AlgebricksException, AsterixException {
-        int numPrimaryKeys = getRIDSize(ds);
-        List<List<String>> secondaryKeyFields = index.getKeyFieldNames();
-        secondaryKeyFields.size();
-        ARecordType itemType = (ARecordType) metadataProvider.findType(ds.getItemTypeDataverseName(),
-                ds.getItemTypeName());
-        Pair<IAType, Boolean> spatialTypePair = Index.getNonNullableKeyFieldType(secondaryKeyFields.get(0), itemType);
-        IAType spatialType = spatialTypePair.first;
-        if (spatialType == null) {
-            throw new AsterixException("Could not find field " + secondaryKeyFields.get(0) + " in the schema.");
-        }
-        boolean isPointMBR = spatialType.getTypeTag() == ATypeTag.POINT || spatialType.getTypeTag() == ATypeTag.POINT3D;
-        int numDimensions = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
-        int numNestedSecondaryKeyFields = numDimensions * 2;
-        IPrimitiveValueProviderFactory[] valueProviderFactories =
-                new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
-        IBinaryComparatorFactory[] secondaryComparatorFactories =
-                new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
-
-        ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys
-                + numNestedSecondaryKeyFields];
-        ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numNestedSecondaryKeyFields + numPrimaryKeys];
-        IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
-        ATypeTag keyType = nestedKeyType.getTypeTag();
-
-        keyType = nestedKeyType.getTypeTag();
-        for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
-            ISerializerDeserializer keySerde = SerializerDeserializerProvider.INSTANCE
-                    .getSerializerDeserializer(nestedKeyType);
-            secondaryRecFields[i] = keySerde;
-
-            secondaryComparatorFactories[i] = BinaryComparatorFactoryProvider.INSTANCE
-                    .getBinaryComparatorFactory(nestedKeyType, true);
-            secondaryTypeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
-            valueProviderFactories[i] = PrimitiveValueProviderFactory.INSTANCE;
-        }
-        // Add serializers and comparators for primary index fields.
-        for (int i = 0; i < numPrimaryKeys; i++) {
-            secondaryRecFields[numNestedSecondaryKeyFields + i] = IndexingConstants.getSerializerDeserializer(i);
-            secondaryTypeTraits[numNestedSecondaryKeyFields + i] = IndexingConstants.getTypeTraits(i);
-        }
-        int[] primaryKeyFields = new int[numPrimaryKeys];
-        for (int i = 0; i < primaryKeyFields.length; i++) {
-            primaryKeyFields[i] = i + numNestedSecondaryKeyFields;
-        }
-
-        return new ExternalRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
-                getBuddyBtreeComparatorFactories(), mergePolicyFactory, mergePolicyFactoryProperties,
-                new SecondaryIndexOperationTrackerProvider(ds.getDatasetId()),
-                RuntimeComponentsProvider.RUNTIME_PROVIDER, LSMRTreeIOOperationCallbackFactory.INSTANCE,
-                MetadataProvider.proposeLinearizer(keyType, secondaryComparatorFactories.length),
-                storageProperties.getBloomFilterFalsePositiveRate(), new int[] { index.getKeyFieldNames().size() },
-                ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(ds), true, isPointMBR);
-    }
-
-    public static JobSpecification buildAbortOp(Dataset ds, List<Index> indexes, MetadataProvider metadataProvider)
-            throws AlgebricksException, AsterixException {
-        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-        IPropertiesProvider asterixPropertiesProvider = AppContextInfo.INSTANCE;
-        StorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties();
-        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(ds,
-                metadataProvider.getMetadataTxnContext());
-        ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
-        Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
-
-        boolean temp = ds.getDatasetDetails().isTemp();
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> filesIndexSplitsAndConstraint = metadataProvider
-                .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(),
-                        getFilesIndexName(ds.getDatasetName()), temp);
-        IFileSplitProvider filesIndexSplitProvider = filesIndexSplitsAndConstraint.first;
-        ExternalBTreeDataflowHelperFactory filesIndexDataflowHelperFactory = getFilesIndexDataflowHelperFactory(ds,
-                mergePolicyFactory, mergePolicyFactoryProperties, storageProperties, spec);
-        IndexInfoOperatorDescriptor filesIndexInfo = new IndexInfoOperatorDescriptor(filesIndexSplitProvider,
-                RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER);
-
-        ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory> btreeDataflowHelperFactories = new ArrayList<>();
-        ArrayList<IndexInfoOperatorDescriptor> btreeInfos = new ArrayList<>();
-        ArrayList<ExternalRTreeDataflowHelperFactory> rtreeDataflowHelperFactories = new ArrayList<>();
-        ArrayList<IndexInfoOperatorDescriptor> rtreeInfos = new ArrayList<>();
-
-        for (Index index : indexes) {
-            if (isValidIndexName(index.getDatasetName(), index.getIndexName())) {
-                Pair<IFileSplitProvider, AlgebricksPartitionConstraint> indexSplitsAndConstraint = metadataProvider
-                        .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(),
-                                index.getIndexName(), temp);
-                if (index.getIndexType() == IndexType.BTREE) {
-                    btreeDataflowHelperFactories.add(getBTreeDataflowHelperFactory(ds, index, mergePolicyFactory,
-                            mergePolicyFactoryProperties, storageProperties, spec));
-                    btreeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first,
-                            RuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            RuntimeComponentsProvider.RUNTIME_PROVIDER));
-                } else if (index.getIndexType() == IndexType.RTREE) {
-                    rtreeDataflowHelperFactories.add(getRTreeDataflowHelperFactory(ds, index, mergePolicyFactory,
-                            mergePolicyFactoryProperties, storageProperties, metadataProvider, spec));
-                    rtreeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first,
-                            RuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            RuntimeComponentsProvider.RUNTIME_PROVIDER));
-                }
-            }
-        }
-
-        ExternalDatasetIndexesAbortOperatorDescriptor op = new ExternalDatasetIndexesAbortOperatorDescriptor(spec,
-                filesIndexDataflowHelperFactory, filesIndexInfo, btreeDataflowHelperFactories, btreeInfos,
-                rtreeDataflowHelperFactories, rtreeInfos);
-
-        spec.addRoot(op);
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, op,
-                filesIndexSplitsAndConstraint.second);
-        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
-        return spec;
-
-    }
-
-    public static JobSpecification buildRecoverOp(Dataset ds, List<Index> indexes, MetadataProvider metadataProvider)
-            throws AlgebricksException, AsterixException {
-        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-        IPropertiesProvider asterixPropertiesProvider = AppContextInfo.INSTANCE;
-        StorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties();
-        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(ds,
-                metadataProvider.getMetadataTxnContext());
-        ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
-        Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
-        boolean temp = ds.getDatasetDetails().isTemp();
-
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> filesIndexSplitsAndConstraint = metadataProvider
-                .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(),
-                        getFilesIndexName(ds.getDatasetName()), temp);
-        IFileSplitProvider filesIndexSplitProvider = filesIndexSplitsAndConstraint.first;
-        ExternalBTreeDataflowHelperFactory filesIndexDataflowHelperFactory = getFilesIndexDataflowHelperFactory(ds,
-                mergePolicyFactory, mergePolicyFactoryProperties, storageProperties, spec);
-        IndexInfoOperatorDescriptor filesIndexInfo = new IndexInfoOperatorDescriptor(filesIndexSplitProvider,
-                RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER);
-
-        ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory> btreeDataflowHelperFactories = new ArrayList<>();
-        ArrayList<IndexInfoOperatorDescriptor> btreeInfos = new ArrayList<>();
-        ArrayList<ExternalRTreeDataflowHelperFactory> rtreeDataflowHelperFactories = new ArrayList<>();
-        ArrayList<IndexInfoOperatorDescriptor> rtreeInfos = new ArrayList<>();
-
-        for (Index index : indexes) {
-            if (isValidIndexName(index.getDatasetName(), index.getIndexName())) {
-                Pair<IFileSplitProvider, AlgebricksPartitionConstraint> indexSplitsAndConstraint = metadataProvider
-                        .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(),
-                                index.getIndexName(), temp);
-                if (index.getIndexType() == IndexType.BTREE) {
-                    btreeDataflowHelperFactories.add(getBTreeDataflowHelperFactory(ds, index, mergePolicyFactory,
-                            mergePolicyFactoryProperties, storageProperties, spec));
-                    btreeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first,
-                            RuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            RuntimeComponentsProvider.RUNTIME_PROVIDER));
-                } else if (index.getIndexType() == IndexType.RTREE) {
-                    rtreeDataflowHelperFactories.add(getRTreeDataflowHelperFactory(ds, index, mergePolicyFactory,
-                            mergePolicyFactoryProperties, storageProperties, metadataProvider, spec));
-                    rtreeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first,
-                            RuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            RuntimeComponentsProvider.RUNTIME_PROVIDER));
-                }
-            }
-        }
-
-        ExternalDatasetIndexesRecoverOperatorDescriptor op = new ExternalDatasetIndexesRecoverOperatorDescriptor(spec,
-                filesIndexDataflowHelperFactory, filesIndexInfo, btreeDataflowHelperFactories, btreeInfos,
-                rtreeDataflowHelperFactories, rtreeInfos);
-
-        spec.addRoot(op);
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, op,
-                filesIndexSplitsAndConstraint.second);
-        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
-        return spec;
-    }
-
-    public static JobSpecification compactFilesIndexJobSpec(Dataset dataset, MetadataProvider metadataProvider)
-            throws MetadataException, AlgebricksException {
-        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-        IPropertiesProvider asterixPropertiesProvider = AppContextInfo.INSTANCE;
-        StorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties();
-        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
-                metadataProvider.getMetadataTxnContext());
-        ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
-        Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
-                .splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(), dataset.getDatasetName(),
-                        getFilesIndexName(dataset.getDatasetName()), true);
-        IFileSplitProvider secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
-        ExternalBTreeDataflowHelperFactory indexDataflowHelperFactory = new ExternalBTreeDataflowHelperFactory(
-                mergePolicyFactory, mergePolicyFactoryProperties,
-                new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                RuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                storageProperties.getBloomFilterFalsePositiveRate(),
-                ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(dataset), true);
-        FilesIndexDescription filesIndexDescription = new FilesIndexDescription();
-        LSMTreeIndexCompactOperatorDescriptor compactOp = new LSMTreeIndexCompactOperatorDescriptor(spec,
-                RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER,
-                secondaryFileSplitProvider, filesIndexDescription.EXTERNAL_FILE_INDEX_TYPE_TRAITS,
-                filesIndexDescription.FILES_INDEX_COMP_FACTORIES, new int[] { 0 }, indexDataflowHelperFactory,
-                NoOpOperationCallbackFactory.INSTANCE, LSMIndexUtil.getMetadataPageManagerFactory());
-        spec.addRoot(compactOp);
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp,
-                secondarySplitsAndConstraint.second);
-        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
-        return spec;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
index 95fe68c..4680465 100755
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
@@ -43,11 +43,11 @@ import org.apache.asterix.external.library.LibraryAdapter;
 import org.apache.asterix.external.library.LibraryFunction;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.api.IMetadataEntity;
 import org.apache.asterix.metadata.entities.DatasourceAdapter;
 import org.apache.asterix.metadata.entities.Dataverse;
 import org.apache.asterix.metadata.entities.Function;
 import org.apache.asterix.metadata.entities.Library;
+import org.apache.asterix.metadata.utils.MetadataUtil;
 import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
 
 public class ExternalLibraryUtils {
@@ -81,7 +81,7 @@ public class ExternalLibraryUtils {
                         // get library file
                         File libraryDir = new File(installLibDir.getAbsolutePath() + File.separator + dataverse
                                 + File.separator + library);
-                        // install if needed (i,e, add the functions, adapters, datasources, parsers to the metadata) <Not required for use>
+                        // install if needed (i,e, add the functions, adapters, datasources, parsers to the metadata)
                         installLibraryIfNeeded(dataverse, libraryDir, uninstalledLibs);
                     }
                 }
@@ -96,7 +96,7 @@ public class ExternalLibraryUtils {
      * @throws Exception
      */
     private static Map<String, List<String>> uninstallLibraries() throws Exception {
-        Map<String, List<String>> uninstalledLibs = new HashMap<String, List<String>>();
+        Map<String, List<String>> uninstalledLibs = new HashMap<>();
         // get the directory of the un-install libraries
         File uninstallLibDir = getLibraryUninstallDir();
         String[] uninstallLibNames;
@@ -116,7 +116,7 @@ public class ExternalLibraryUtils {
                 // add the library to the list of uninstalled libraries
                 List<String> uinstalledLibsInDv = uninstalledLibs.get(dataverse);
                 if (uinstalledLibsInDv == null) {
-                    uinstalledLibsInDv = new ArrayList<String>();
+                    uinstalledLibsInDv = new ArrayList<>();
                     uninstalledLibs.put(dataverse, uinstalledLibsInDv);
                 }
                 uinstalledLibsInDv.add(libName);
@@ -172,7 +172,8 @@ public class ExternalLibraryUtils {
                 // belong to the library?
                 if (adapter.getAdapterIdentifier().getName().startsWith(libraryName + "#")) {
                     // remove adapter <! we didn't check if there are feeds which use this adapter>
-                    MetadataManager.INSTANCE.dropAdapter(mdTxnCtx, dataverse, adapter.getAdapterIdentifier().getName());
+                    MetadataManager.INSTANCE.dropAdapter(mdTxnCtx, dataverse,
+                            adapter.getAdapterIdentifier().getName());
                 }
             }
             // drop the library itself
@@ -203,7 +204,8 @@ public class ExternalLibraryUtils {
             Library libraryInMetadata = MetadataManager.INSTANCE.getLibrary(mdTxnCtx, dataverse, libraryName);
             if (libraryInMetadata != null && !wasUninstalled) {
                 // exists in metadata and was not un-installed, we return.
-                // Another place which shows that our metadata transactions are broken (we didn't call commit before!!!)
+                // Another place which shows that our metadata transactions are broken
+                // (we didn't call commit before!!!)
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 return;
             }
@@ -235,13 +237,13 @@ public class ExternalLibraryUtils {
             Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse);
             if (dv == null) {
                 MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dataverse,
-                        NonTaggedDataFormat.NON_TAGGED_DATA_FORMAT, IMetadataEntity.PENDING_NO_OP));
+                        NonTaggedDataFormat.NON_TAGGED_DATA_FORMAT, MetadataUtil.PENDING_NO_OP));
             }
             // Add functions
             if (library.getLibraryFunctions() != null) {
                 for (LibraryFunction function : library.getLibraryFunctions().getLibraryFunction()) {
                     String[] fargs = function.getArguments().trim().split(",");
-                    List<String> args = new ArrayList<String>();
+                    List<String> args = new ArrayList<>();
                     for (String arg : fargs) {
                         args.add(arg);
                     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
deleted file mode 100644
index 34746d3..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.external;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-
-import org.apache.asterix.active.ActiveJobNotificationHandler;
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.active.IActiveMessage;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.utils.StoragePathUtil;
-import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.asterix.external.feed.api.IFeedJoint;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedEventsListener;
-import org.apache.asterix.external.feed.message.EndFeedMessage;
-import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.asterix.external.feed.watch.FeedConnectJobInfo;
-import org.apache.asterix.external.operators.FeedMessageOperatorDescriptor;
-import org.apache.asterix.external.util.FeedConstants;
-import org.apache.asterix.external.util.FeedUtils;
-import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
-import org.apache.asterix.file.JobSpecificationUtils;
-import org.apache.asterix.metadata.declared.MetadataProvider;
-import org.apache.asterix.metadata.entities.Feed;
-import org.apache.asterix.runtime.util.ClusterStateManager;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.common.utils.Triple;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.io.FileSplit;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.file.FileRemoveOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
-import org.apache.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
-
-/**
- * Provides helper method(s) for creating JobSpec for operations on a feed.
- */
-public class FeedOperations {
-
-    /**
-     * Builds the job spec for ingesting a (primary) feed from its external source via the feed adaptor.
-     *
-     * @param primaryFeed
-     * @param metadataProvider
-     * @return JobSpecification the Hyracks job specification for receiving data from external source
-     * @throws Exception
-     */
-    public static Pair<JobSpecification, IAdapterFactory> buildFeedIntakeJobSpec(Feed primaryFeed,
-            MetadataProvider metadataProvider, FeedPolicyAccessor policyAccessor) throws Exception {
-        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-        spec.setFrameSize(FeedConstants.JobConstants.DEFAULT_FRAME_SIZE);
-        IAdapterFactory adapterFactory = null;
-        IOperatorDescriptor feedIngestor;
-        AlgebricksPartitionConstraint ingesterPc;
-        Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory> t =
-                metadataProvider.buildFeedIntakeRuntime(spec, primaryFeed, policyAccessor);
-        feedIngestor = t.first;
-        ingesterPc = t.second;
-        adapterFactory = t.third;
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, feedIngestor, ingesterPc);
-        NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec);
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, nullSink, ingesterPc);
-        spec.connect(new OneToOneConnectorDescriptor(spec), feedIngestor, 0, nullSink, 0);
-        spec.addRoot(nullSink);
-        return new Pair<>(spec, adapterFactory);
-    }
-
-    /**
-     * Builds the job spec for sending message to an active feed to disconnect it from the
-     * its source.
-     */
-    public static Pair<JobSpecification, Boolean> buildDisconnectFeedJobSpec(MetadataProvider metadataProvider,
-            FeedConnectionId connectionId) throws AsterixException, AlgebricksException {
-
-        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-        IOperatorDescriptor feedMessenger;
-        AlgebricksPartitionConstraint messengerPc;
-        List<String> locations = null;
-        FeedRuntimeType sourceRuntimeType;
-        try {
-            FeedEventsListener listener = (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE
-                    .getActiveEntityListener(connectionId.getFeedId());
-            FeedConnectJobInfo cInfo = listener.getFeedConnectJobInfo(connectionId);
-            IFeedJoint sourceFeedJoint = cInfo.getSourceFeedJoint();
-            IFeedJoint computeFeedJoint = cInfo.getComputeFeedJoint();
-
-            boolean terminateIntakeJob = false;
-            boolean completeDisconnect = computeFeedJoint == null || computeFeedJoint.getReceivers().isEmpty();
-            if (completeDisconnect) {
-                sourceRuntimeType = FeedRuntimeType.INTAKE;
-                locations = cInfo.getCollectLocations();
-                terminateIntakeJob = sourceFeedJoint.getReceivers().size() == 1;
-            } else {
-                locations = cInfo.getComputeLocations();
-                sourceRuntimeType = FeedRuntimeType.COMPUTE;
-            }
-
-            Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = buildDisconnectFeedMessengerRuntime(spec,
-                    connectionId, locations, sourceRuntimeType, completeDisconnect, sourceFeedJoint.getOwnerFeedId());
-
-            feedMessenger = p.first;
-            messengerPc = p.second;
-
-            AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, feedMessenger, messengerPc);
-            NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec);
-            AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, nullSink, messengerPc);
-            spec.connect(new OneToOneConnectorDescriptor(spec), feedMessenger, 0, nullSink, 0);
-            spec.addRoot(nullSink);
-            return new Pair<>(spec, terminateIntakeJob);
-
-        } catch (AlgebricksException e) {
-            throw new AsterixException(e);
-        }
-
-    }
-
-    private static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildSendFeedMessageRuntime(
-            JobSpecification jobSpec, FeedConnectionId feedConenctionId, IActiveMessage feedMessage,
-            Collection<String> locations) throws AlgebricksException {
-        AlgebricksPartitionConstraint partitionConstraint =
-                new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[] {}));
-        FeedMessageOperatorDescriptor feedMessenger =
-                new FeedMessageOperatorDescriptor(jobSpec, feedConenctionId, feedMessage);
-        return new Pair<>(feedMessenger, partitionConstraint);
-    }
-
-    private static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildDisconnectFeedMessengerRuntime(
-            JobSpecification jobSpec, FeedConnectionId feedConenctionId, List<String> locations,
-            FeedRuntimeType sourceFeedRuntimeType, boolean completeDisconnection, EntityId sourceFeedId)
-            throws AlgebricksException {
-        IActiveMessage feedMessage = new EndFeedMessage(feedConenctionId, sourceFeedRuntimeType, sourceFeedId,
-                completeDisconnection, EndFeedMessage.EndMessageType.DISCONNECT_FEED);
-        return buildSendFeedMessageRuntime(jobSpec, feedConenctionId, feedMessage, locations);
-    }
-
-    public static JobSpecification buildRemoveFeedStorageJob(Feed feed) throws Exception {
-        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-        AlgebricksAbsolutePartitionConstraint allCluster = ClusterStateManager.INSTANCE.getClusterLocations();
-        Set<String> nodes = new TreeSet<>();
-        for (String node : allCluster.getLocations()) {
-            nodes.add(node);
-        }
-        AlgebricksAbsolutePartitionConstraint locations =
-                new AlgebricksAbsolutePartitionConstraint(nodes.toArray(new String[nodes.size()]));
-        FileSplit[] feedLogFileSplits =
-                FeedUtils.splitsForAdapter(feed.getDataverseName(), feed.getFeedName(), locations);
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                StoragePathUtil.splitProviderAndPartitionConstraints(feedLogFileSplits);
-        FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(spec, splitsAndConstraint.first, true);
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, frod, splitsAndConstraint.second);
-        spec.addRoot(frod);
-        return spec;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java
index 6be7af9..ec7c239 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java
@@ -24,17 +24,19 @@ import java.util.List;
 
 import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
 import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.compiler.provider.AqlCompilationProvider;
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.external.feed.api.IFeedWork;
 import org.apache.asterix.external.feed.api.IFeedWorkEventListener;
 import org.apache.asterix.external.feed.management.FeedConnectionRequest;
 import org.apache.asterix.external.feed.management.FeedConnectionRequest.ConnectionStatus;
+import org.apache.asterix.file.StorageComponentProvider;
 import org.apache.asterix.lang.aql.statement.SubscribeFeedStatement;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.statement.DataverseDecl;
 import org.apache.asterix.lang.common.struct.Identifier;
-import org.apache.asterix.runtime.util.AppContextInfo;
+import org.apache.asterix.runtime.utils.AppContextInfo;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.SessionConfig;
 import org.apache.asterix.translator.SessionConfig.OutputFormat;
@@ -48,6 +50,7 @@ public class FeedWorkCollection {
 
     private static Logger LOGGER = Logger.getLogger(FeedWorkCollection.class.getName());
     private static final ILangCompilationProvider compilationProvider = new AqlCompilationProvider();
+    private static final IStorageComponentProvider storageComponentProvider = new StorageComponentProvider();
 
     /**
      * The task of subscribing to a feed to obtain data.
@@ -91,7 +94,8 @@ public class FeedWorkCollection {
                     List<Statement> statements = new ArrayList<>();
                     statements.add(dataverseDecl);
                     statements.add(subscribeStmt);
-                    IStatementExecutor translator = qtFactory.create(statements, pc, compilationProvider);
+                    IStatementExecutor translator = qtFactory.create(statements, pc, compilationProvider,
+                            storageComponentProvider);
                     translator.compileAndExecute(AppContextInfo.INSTANCE.getHcc(), null,
                             QueryTranslator.ResultDelivery.IMMEDIATE);
                     if (LOGGER.isEnabledFor(Level.INFO)) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index b1ca062..b114e8c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -29,26 +29,26 @@ import java.util.logging.Logger;
 
 import org.apache.asterix.active.ActiveManager;
 import org.apache.asterix.api.common.AppRuntimeContextProviderForRecovery;
-import org.apache.asterix.common.api.ThreadExecutor;
 import org.apache.asterix.common.api.IAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.ThreadExecutor;
 import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.config.AsterixExtension;
 import org.apache.asterix.common.config.BuildProperties;
+import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.config.CompilerProperties;
-import org.apache.asterix.common.config.AsterixExtension;
 import org.apache.asterix.common.config.ExtensionProperties;
 import org.apache.asterix.common.config.ExternalProperties;
 import org.apache.asterix.common.config.FeedProperties;
+import org.apache.asterix.common.config.MessagingProperties;
 import org.apache.asterix.common.config.MetadataProperties;
 import org.apache.asterix.common.config.PropertiesAccessor;
 import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.config.StorageProperties;
 import org.apache.asterix.common.config.TransactionProperties;
-import org.apache.asterix.common.config.ClusterProperties;
-import org.apache.asterix.common.config.IPropertiesProvider;
-import org.apache.asterix.common.config.MessagingProperties;
-import org.apache.asterix.common.context.FileMapManager;
 import org.apache.asterix.common.context.DatasetLifecycleManager;
+import org.apache.asterix.common.context.FileMapManager;
+import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.library.ILibraryManager;
@@ -62,6 +62,7 @@ import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.external.library.ExternalLibraryManager;
+import org.apache.asterix.file.StorageComponentProvider;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataNode;
 import org.apache.asterix.metadata.api.IAsterixStateProxy;
@@ -74,7 +75,6 @@ import org.apache.asterix.replication.storage.ReplicaResourcesManager;
 import org.apache.asterix.runtime.transaction.GlobalResourceIdFactoryProvider;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepositoryFactory;
-import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
 import org.apache.hyracks.api.application.INCApplicationContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IIOManager;
@@ -99,13 +99,12 @@ import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.file.ILocalResourceRepositoryFactory;
 import org.apache.hyracks.storage.common.file.IResourceIdFactory;
 
-public class NCAppRuntimeContext implements IAppRuntimeContext, IPropertiesProvider {
+public class NCAppRuntimeContext implements IAppRuntimeContext {
     private static final Logger LOGGER = Logger.getLogger(NCAppRuntimeContext.class.getName());
 
     private ILSMMergePolicyFactory metadataMergePolicyFactory;
     private final INCApplicationContext ncApplicationContext;
     private final IResourceIdFactory resourceIdFactory;
-
     private CompilerProperties compilerProperties;
     private ExternalProperties externalProperties;
     private MetadataProperties metadataProperties;
@@ -115,7 +114,6 @@ public class NCAppRuntimeContext implements IAppRuntimeContext, IPropertiesProvi
     private BuildProperties buildProperties;
     private ReplicationProperties replicationProperties;
     private MessagingProperties messagingProperties;
-
     private ThreadExecutor threadExecutor;
     private IDatasetLifecycleManager datasetLifecycleManager;
     private IFileMapManager fileMapManager;
@@ -136,14 +134,14 @@ public class NCAppRuntimeContext implements IAppRuntimeContext, IPropertiesProvi
 
     private final ILibraryManager libraryManager;
     private final NCExtensionManager ncExtensionManager;
+    private final IStorageComponentProvider componentProvider;
 
     public NCAppRuntimeContext(INCApplicationContext ncApplicationContext, List<AsterixExtension> extensions)
-            throws AsterixException, InstantiationException, IllegalAccessException,
-            ClassNotFoundException, IOException {
+            throws AsterixException, InstantiationException, IllegalAccessException, ClassNotFoundException,
+            IOException {
         List<AsterixExtension> allExtensions = new ArrayList<>();
         this.ncApplicationContext = ncApplicationContext;
-        PropertiesAccessor propertiesAccessor =
-                PropertiesAccessor.getInstance(ncApplicationContext.getAppConfig());
+        PropertiesAccessor propertiesAccessor = PropertiesAccessor.getInstance(ncApplicationContext.getAppConfig());
         compilerProperties = new CompilerProperties(propertiesAccessor);
         externalProperties = new ExternalProperties(propertiesAccessor);
         metadataProperties = new MetadataProperties(propertiesAccessor);
@@ -159,6 +157,7 @@ public class NCAppRuntimeContext implements IAppRuntimeContext, IPropertiesProvi
         }
         allExtensions.addAll(new ExtensionProperties(propertiesAccessor).getExtensions());
         ncExtensionManager = new NCExtensionManager(allExtensions);
+        componentProvider = new StorageComponentProvider();
         resourceIdFactory = new GlobalResourceIdFactoryProvider(ncApplicationContext).createResourceIdFactory();
     }
 
@@ -181,16 +180,15 @@ public class NCAppRuntimeContext implements IAppRuntimeContext, IPropertiesProvi
         metadataMergePolicyFactory = new PrefixMergePolicyFactory();
 
         ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory =
-                new PersistentLocalResourceRepositoryFactory(
-                        ioManager, ncApplicationContext.getNodeId(), metadataProperties);
+                new PersistentLocalResourceRepositoryFactory(ioManager, ncApplicationContext.getNodeId(),
+                        metadataProperties);
 
-        localResourceRepository = (PersistentLocalResourceRepository) persistentLocalResourceRepositoryFactory
-                .createRepository();
+        localResourceRepository =
+                (PersistentLocalResourceRepository) persistentLocalResourceRepositoryFactory.createRepository();
 
-        IAppRuntimeContextProvider asterixAppRuntimeContextProvider =
-                new AppRuntimeContextProviderForRecovery(this);
-        txnSubsystem = new TransactionSubsystem(ncApplicationContext.getNodeId(), asterixAppRuntimeContextProvider,
-                txnProperties);
+        IAppRuntimeContextProvider asterixAppRuntimeContextProvider = new AppRuntimeContextProviderForRecovery(this);
+        txnSubsystem = new TransactionSubsystem(ncApplicationContext, ncApplicationContext.getNodeId(),
+                asterixAppRuntimeContextProvider, txnProperties);
 
         IRecoveryManager recoveryMgr = txnSubsystem.getRecoveryManager();
         SystemState systemState = recoveryMgr.getSystemState();
@@ -448,9 +446,9 @@ public class NCAppRuntimeContext implements IAppRuntimeContext, IPropertiesProvi
         // This way we can delay the registration of the metadataNode until
         // it is completely initialized.
         MetadataManager.initialize(proxy, MetadataNode.INSTANCE);
-        MetadataBootstrap.startUniverse(this, ncApplicationContext, newUniverse);
+        MetadataBootstrap.startUniverse(ncApplicationContext, newUniverse);
         MetadataBootstrap.startDDLRecovery();
-        ncExtensionManager.initializeMetadata();
+        ncExtensionManager.initializeMetadata(ncApplicationContext);
 
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Metadata node bound");
@@ -473,4 +471,9 @@ public class NCAppRuntimeContext implements IAppRuntimeContext, IPropertiesProvi
         return ncExtensionManager;
     }
 
+    @Override
+    public IStorageComponentProvider getStorageComponentProvider() {
+        return componentProvider;
+    }
+
 }


Mime
View raw message