asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From buyin...@apache.org
Subject [10/16] asterixdb git commit: Add Asterix Extension Manager
Date Sat, 20 Aug 2016 06:15:52 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
deleted file mode 100644
index d6065fb..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
+++ /dev/null
@@ -1,3107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.aql.translator;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.rmi.RemoteException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Random;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.active.ActiveJobNotificationHandler;
-import org.apache.asterix.active.ActivityState;
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.active.IActiveEntityEventsListener;
-import org.apache.asterix.api.common.APIFramework;
-import org.apache.asterix.api.common.SessionConfig;
-import org.apache.asterix.app.external.ExternalIndexingOperations;
-import org.apache.asterix.app.external.FeedJoint;
-import org.apache.asterix.app.external.FeedOperations;
-import org.apache.asterix.common.config.AsterixExternalProperties;
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState;
-import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
-import org.apache.asterix.common.config.DatasetConfig.IndexType;
-import org.apache.asterix.common.config.GlobalConfig;
-import org.apache.asterix.common.config.MetadataConstants;
-import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.functions.FunctionSignature;
-import org.apache.asterix.compiler.provider.ILangCompilationProvider;
-import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.asterix.external.feed.api.IFeed;
-import org.apache.asterix.external.feed.api.IFeed.FeedType;
-import org.apache.asterix.external.feed.api.IFeedJoint;
-import org.apache.asterix.external.feed.api.IFeedJoint.FeedJointType;
-import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber;
-import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber.FeedLifecycleEvent;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedConnectionRequest;
-import org.apache.asterix.external.feed.management.FeedEventsListener;
-import org.apache.asterix.external.feed.management.FeedJointKey;
-import org.apache.asterix.external.feed.management.FeedLifecycleEventSubscriber;
-import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.asterix.external.feed.watch.FeedActivity.FeedActivityDetails;
-import org.apache.asterix.external.feed.watch.FeedConnectJobInfo;
-import org.apache.asterix.external.feed.watch.FeedIntakeInfo;
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
-import org.apache.asterix.file.DatasetOperations;
-import org.apache.asterix.file.DataverseOperations;
-import org.apache.asterix.file.IndexOperations;
-import org.apache.asterix.formats.nontagged.AqlTypeTraitProvider;
-import org.apache.asterix.lang.aql.statement.SubscribeFeedStatement;
-import org.apache.asterix.lang.common.base.IRewriterFactory;
-import org.apache.asterix.lang.common.base.IStatementRewriter;
-import org.apache.asterix.lang.common.base.Statement;
-import org.apache.asterix.lang.common.expression.TypeExpression;
-import org.apache.asterix.lang.common.statement.CompactStatement;
-import org.apache.asterix.lang.common.statement.ConnectFeedStatement;
-import org.apache.asterix.lang.common.statement.CreateDataverseStatement;
-import org.apache.asterix.lang.common.statement.CreateFeedPolicyStatement;
-import org.apache.asterix.lang.common.statement.CreateFeedStatement;
-import org.apache.asterix.lang.common.statement.CreateFunctionStatement;
-import org.apache.asterix.lang.common.statement.CreateIndexStatement;
-import org.apache.asterix.lang.common.statement.CreatePrimaryFeedStatement;
-import org.apache.asterix.lang.common.statement.CreateSecondaryFeedStatement;
-import org.apache.asterix.lang.common.statement.DatasetDecl;
-import org.apache.asterix.lang.common.statement.DataverseDecl;
-import org.apache.asterix.lang.common.statement.DataverseDropStatement;
-import org.apache.asterix.lang.common.statement.DeleteStatement;
-import org.apache.asterix.lang.common.statement.DisconnectFeedStatement;
-import org.apache.asterix.lang.common.statement.DropStatement;
-import org.apache.asterix.lang.common.statement.ExternalDetailsDecl;
-import org.apache.asterix.lang.common.statement.FeedDropStatement;
-import org.apache.asterix.lang.common.statement.FeedPolicyDropStatement;
-import org.apache.asterix.lang.common.statement.FunctionDecl;
-import org.apache.asterix.lang.common.statement.FunctionDropStatement;
-import org.apache.asterix.lang.common.statement.IDatasetDetailsDecl;
-import org.apache.asterix.lang.common.statement.IndexDropStatement;
-import org.apache.asterix.lang.common.statement.InsertStatement;
-import org.apache.asterix.lang.common.statement.InternalDetailsDecl;
-import org.apache.asterix.lang.common.statement.LoadStatement;
-import org.apache.asterix.lang.common.statement.NodeGroupDropStatement;
-import org.apache.asterix.lang.common.statement.NodegroupDecl;
-import org.apache.asterix.lang.common.statement.Query;
-import org.apache.asterix.lang.common.statement.RefreshExternalDatasetStatement;
-import org.apache.asterix.lang.common.statement.RunStatement;
-import org.apache.asterix.lang.common.statement.SetStatement;
-import org.apache.asterix.lang.common.statement.TypeDecl;
-import org.apache.asterix.lang.common.statement.TypeDropStatement;
-import org.apache.asterix.lang.common.statement.WriteStatement;
-import org.apache.asterix.lang.common.struct.Identifier;
-import org.apache.asterix.lang.common.util.FunctionUtil;
-import org.apache.asterix.metadata.IDatasetDetails;
-import org.apache.asterix.metadata.MetadataException;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.api.IMetadataEntity;
-import org.apache.asterix.metadata.dataset.hints.DatasetHints;
-import org.apache.asterix.metadata.dataset.hints.DatasetHints.DatasetNodegroupCardinalityHint;
-import org.apache.asterix.metadata.declared.AqlMetadataProvider;
-import org.apache.asterix.metadata.entities.CompactionPolicy;
-import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.metadata.entities.Datatype;
-import org.apache.asterix.metadata.entities.Dataverse;
-import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
-import org.apache.asterix.metadata.entities.Feed;
-import org.apache.asterix.metadata.entities.FeedPolicyEntity;
-import org.apache.asterix.metadata.entities.Function;
-import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.entities.InternalDatasetDetails;
-import org.apache.asterix.metadata.entities.NodeGroup;
-import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
-import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
-import org.apache.asterix.metadata.utils.DatasetUtils;
-import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
-import org.apache.asterix.metadata.utils.KeyFieldTypeUtils;
-import org.apache.asterix.metadata.utils.MetadataLockManager;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.types.TypeSignature;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.om.util.AsterixClusterProperties;
-import org.apache.asterix.optimizer.rules.IntroduceSecondaryIndexInsertDeleteRule;
-import org.apache.asterix.result.ResultReader;
-import org.apache.asterix.result.ResultUtils;
-import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
-import org.apache.asterix.translator.AbstractLangTranslator;
-import org.apache.asterix.translator.CompiledStatements.CompiledConnectFeedStatement;
-import org.apache.asterix.translator.CompiledStatements.CompiledCreateIndexStatement;
-import org.apache.asterix.translator.CompiledStatements.CompiledDatasetDropStatement;
-import org.apache.asterix.translator.CompiledStatements.CompiledDeleteStatement;
-import org.apache.asterix.translator.CompiledStatements.CompiledIndexCompactStatement;
-import org.apache.asterix.translator.CompiledStatements.CompiledIndexDropStatement;
-import org.apache.asterix.translator.CompiledStatements.CompiledInsertStatement;
-import org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
-import org.apache.asterix.translator.CompiledStatements.CompiledSubscribeFeedStatement;
-import org.apache.asterix.translator.CompiledStatements.CompiledUpsertStatement;
-import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
-import org.apache.asterix.translator.TypeTranslator;
-import org.apache.asterix.translator.util.ValidateUtil;
-import org.apache.asterix.util.FlushDatasetUtils;
-import org.apache.asterix.util.JobUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.common.utils.Triple;
-import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind;
-import org.apache.hyracks.algebricks.data.IAWriterFactory;
-import org.apache.hyracks.algebricks.data.IResultSerializerFactoryProvider;
-import org.apache.hyracks.algebricks.runtime.serializer.ResultSerializerFactoryProvider;
-import org.apache.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.dataflow.value.ITypeTraits;
-import org.apache.hyracks.api.dataset.IHyracksDataset;
-import org.apache.hyracks.api.dataset.ResultSetId;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.std.file.FileSplit;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import com.google.common.collect.Lists;
-
-/*
- * Provides functionality for executing a batch of Query statements (queries included)
- * sequentially.
- */
-public class QueryTranslator extends AbstractLangTranslator {
-
-    private static final Logger LOGGER = Logger.getLogger(QueryTranslator.class.getName());
-
-    private enum ProgressState {
-        NO_PROGRESS,
-        ADDED_PENDINGOP_RECORD_TO_METADATA
-    }
-
-    public enum ResultDelivery {
-        SYNC,
-        ASYNC,
-        ASYNC_DEFERRED
-    }
-
-    public static final boolean IS_DEBUG_MODE = false;// true
-    private final List<Statement> statements;
-    private final SessionConfig sessionConfig;
-    private Dataverse activeDefaultDataverse;
-    private final List<FunctionDecl> declaredFunctions;
-    private final APIFramework apiFramework;
-    private final IRewriterFactory rewriterFactory;
-
-    public QueryTranslator(List<Statement> aqlStatements, SessionConfig conf,
-            ILangCompilationProvider compliationProvider) {
-        this.statements = aqlStatements;
-        this.sessionConfig = conf;
-        this.declaredFunctions = getDeclaredFunctions(aqlStatements);
-        this.apiFramework = new APIFramework(compliationProvider);
-        this.rewriterFactory = compliationProvider.getRewriterFactory();
-    }
-
-    private List<FunctionDecl> getDeclaredFunctions(List<Statement> statements) {
-        List<FunctionDecl> functionDecls = new ArrayList<>();
-        for (Statement st : statements) {
-            if (st.getKind() == Statement.Kind.FUNCTION_DECL) {
-                functionDecls.add((FunctionDecl) st);
-            }
-        }
-        return functionDecls;
-    }
-
-    /**
-     * Compiles and submits for execution a list of AQL statements.
-     *
-     * @param hcc
-     *            A Hyracks client connection that is used to submit a jobspec to Hyracks.
-     * @param hdc
-     *            A Hyracks dataset client object that is used to read the results.
-     * @param resultDelivery
-     *            True if the results should be read asynchronously or false if we should wait for results to be read.
-     * @return A List<QueryResult> containing a QueryResult instance corresponding to each submitted query.
-     * @throws Exception
-     */
-    public void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery)
-            throws Exception {
-        compileAndExecute(hcc, hdc, resultDelivery, new ResultUtils.Stats());
-    }
-
-    public void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery,
-            ResultUtils.Stats stats) throws Exception {
-        int resultSetIdCounter = 0;
-        FileSplit outputFile = null;
-        IAWriterFactory writerFactory = PrinterBasedWriterFactory.INSTANCE;
-        IResultSerializerFactoryProvider resultSerializerFactoryProvider = ResultSerializerFactoryProvider.INSTANCE;
-        Map<String, String> config = new HashMap<>();
-        /* Since the system runs a large number of threads, when HTTP requests don't return, it becomes difficult to
-         * find the thread running the request to determine where it has stopped.
-         * Setting the thread name helps make that easier
-         */
-        String threadName = Thread.currentThread().getName();
-        Thread.currentThread().setName(QueryTranslator.class.getSimpleName());
-        try {
-            for (Statement stmt : statements) {
-                if (sessionConfig.is(SessionConfig.FORMAT_HTML)) {
-                    sessionConfig.out().println(APIFramework.HTML_STATEMENT_SEPARATOR);
-                }
-                validateOperation(activeDefaultDataverse, stmt);
-                rewriteStatement(stmt); // Rewrite the statement's AST.
-                AqlMetadataProvider metadataProvider = new AqlMetadataProvider(activeDefaultDataverse);
-                metadataProvider.setWriterFactory(writerFactory);
-                metadataProvider.setResultSerializerFactoryProvider(resultSerializerFactoryProvider);
-                metadataProvider.setOutputFile(outputFile);
-                metadataProvider.setConfig(config);
-                switch (stmt.getKind()) {
-                    case Statement.Kind.SET:
-                        handleSetStatement(stmt, config);
-                        break;
-                    case Statement.Kind.DATAVERSE_DECL:
-                        activeDefaultDataverse = handleUseDataverseStatement(metadataProvider, stmt);
-                        break;
-                    case Statement.Kind.CREATE_DATAVERSE:
-                        handleCreateDataverseStatement(metadataProvider, stmt);
-                        break;
-                    case Statement.Kind.DATASET_DECL:
-                        handleCreateDatasetStatement(metadataProvider, stmt, hcc);
-                        break;
-                    case Statement.Kind.CREATE_INDEX:
-                        handleCreateIndexStatement(metadataProvider, stmt, hcc);
-                        break;
-                    case Statement.Kind.TYPE_DECL:
-                        handleCreateTypeStatement(metadataProvider, stmt);
-                        break;
-                    case Statement.Kind.NODEGROUP_DECL:
-                        handleCreateNodeGroupStatement(metadataProvider, stmt);
-                        break;
-                    case Statement.Kind.DATAVERSE_DROP:
-                        handleDataverseDropStatement(metadataProvider, stmt, hcc);
-                        break;
-                    case Statement.Kind.DATASET_DROP:
-                        handleDatasetDropStatement(metadataProvider, stmt, hcc);
-                        break;
-                    case Statement.Kind.INDEX_DROP:
-                        handleIndexDropStatement(metadataProvider, stmt, hcc);
-                        break;
-                    case Statement.Kind.TYPE_DROP:
-                        handleTypeDropStatement(metadataProvider, stmt);
-                        break;
-                    case Statement.Kind.NODEGROUP_DROP:
-                        handleNodegroupDropStatement(metadataProvider, stmt);
-                        break;
-                    case Statement.Kind.CREATE_FUNCTION:
-                        handleCreateFunctionStatement(metadataProvider, stmt);
-                        break;
-                    case Statement.Kind.FUNCTION_DROP:
-                        handleFunctionDropStatement(metadataProvider, stmt);
-                        break;
-                    case Statement.Kind.LOAD:
-                        handleLoadStatement(metadataProvider, stmt, hcc);
-                        break;
-                    case Statement.Kind.INSERT:
-                    case Statement.Kind.UPSERT:
-                        handleInsertUpsertStatement(metadataProvider, stmt, hcc);
-                        break;
-                    case Statement.Kind.DELETE:
-                        handleDeleteStatement(metadataProvider, stmt, hcc);
-                        break;
-                    case Statement.Kind.CREATE_PRIMARY_FEED:
-                    case Statement.Kind.CREATE_SECONDARY_FEED:
-                        handleCreateFeedStatement(metadataProvider, stmt, hcc);
-                        break;
-                    case Statement.Kind.DROP_FEED:
-                        handleDropFeedStatement(metadataProvider, stmt, hcc);
-                        break;
-                    case Statement.Kind.DROP_FEED_POLICY:
-                        handleDropFeedPolicyStatement(metadataProvider, stmt, hcc);
-                        break;
-                    case Statement.Kind.CONNECT_FEED:
-                        handleConnectFeedStatement(metadataProvider, stmt, hcc);
-                        break;
-                    case Statement.Kind.DISCONNECT_FEED:
-                        handleDisconnectFeedStatement(metadataProvider, stmt, hcc);
-                        break;
-                    case Statement.Kind.SUBSCRIBE_FEED:
-                        handleSubscribeFeedStatement(metadataProvider, stmt, hcc);
-                        break;
-                    case Statement.Kind.CREATE_FEED_POLICY:
-                        handleCreateFeedPolicyStatement(metadataProvider, stmt, hcc);
-                        break;
-                    case Statement.Kind.QUERY:
-                        metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
-                        metadataProvider.setResultAsyncMode(resultDelivery == ResultDelivery.ASYNC
-                                || resultDelivery == ResultDelivery.ASYNC_DEFERRED);
-                        handleQuery(metadataProvider, (Query) stmt, hcc, hdc, resultDelivery, stats);
-                        break;
-                    case Statement.Kind.COMPACT:
-                        handleCompactStatement(metadataProvider, stmt, hcc);
-                        break;
-                    case Statement.Kind.EXTERNAL_DATASET_REFRESH:
-                        handleExternalDatasetRefreshStatement(metadataProvider, stmt, hcc);
-                        break;
-                    case Statement.Kind.WRITE:
-                        Pair<IAWriterFactory, FileSplit> result = handleWriteStatement(stmt);
-                        writerFactory = (result.first != null) ? result.first : writerFactory;
-                        outputFile = result.second;
-                        break;
-                    case Statement.Kind.RUN:
-                        handleRunStatement(metadataProvider, stmt, hcc);
-                        break;
-                    default:
-                        // Default should delegate unknown statement to extension-manager
-                        break;
-                }
-            }
-        } finally {
-            Thread.currentThread().setName(threadName);
-        }
-    }
-
-    private void handleSetStatement(Statement stmt, Map<String, String> config) {
-        SetStatement ss = (SetStatement) stmt;
-        String pname = ss.getPropName();
-        String pvalue = ss.getPropValue();
-        config.put(pname, pvalue);
-    }
-
-    private Pair<IAWriterFactory, FileSplit> handleWriteStatement(Statement stmt)
-            throws InstantiationException, IllegalAccessException, ClassNotFoundException {
-        WriteStatement ws = (WriteStatement) stmt;
-        File f = new File(ws.getFileName());
-        FileSplit outputFile = new FileSplit(ws.getNcName().getValue(), new FileReference(f));
-        IAWriterFactory writerFactory = null;
-        if (ws.getWriterClassName() != null) {
-            writerFactory = (IAWriterFactory) Class.forName(ws.getWriterClassName()).newInstance();
-        }
-        return new Pair<>(writerFactory, outputFile);
-    }
-
-    private Dataverse handleUseDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt)
-            throws Exception {
-        DataverseDecl dvd = (DataverseDecl) stmt;
-        String dvName = dvd.getDataverseName().getValue();
-        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-        metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        MetadataLockManager.INSTANCE.acquireDataverseReadLock(dvName);
-        try {
-            Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
-            if (dv == null) {
-                throw new MetadataException("Unknown dataverse " + dvName);
-            }
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-            return dv;
-        } catch (Exception e) {
-            abort(e, e, mdTxnCtx);
-            throw new MetadataException(e);
-        } finally {
-            MetadataLockManager.INSTANCE.releaseDataverseReadLock(dvName);
-        }
-    }
-
-    private void handleCreateDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
-
-        CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt;
-        String dvName = stmtCreateDataverse.getDataverseName().getValue();
-        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-        metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
-        MetadataLockManager.INSTANCE.acquireDataverseReadLock(dvName);
-        try {
-            Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
-            if (dv != null) {
-                if (stmtCreateDataverse.getIfNotExists()) {
-                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                    return;
-                } else {
-                    throw new AlgebricksException("A dataverse with this name " + dvName + " already exists.");
-                }
-            }
-            MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(),
-                    new Dataverse(dvName, stmtCreateDataverse.getFormat(), IMetadataEntity.PENDING_NO_OP));
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-        } catch (Exception e) {
-            abort(e, e, mdTxnCtx);
-            throw e;
-        } finally {
-            MetadataLockManager.INSTANCE.releaseDataverseReadLock(dvName);
-        }
-    }
-
-    private void validateCompactionPolicy(String compactionPolicy, Map<String, String> compactionPolicyProperties,
-            MetadataTransactionContext mdTxnCtx, boolean isExternalDataset) throws AsterixException, Exception {
-        CompactionPolicy compactionPolicyEntity = MetadataManager.INSTANCE.getCompactionPolicy(mdTxnCtx,
-                MetadataConstants.METADATA_DATAVERSE_NAME, compactionPolicy);
-        if (compactionPolicyEntity == null) {
-            throw new AsterixException("Unknown compaction policy: " + compactionPolicy);
-        }
-        String compactionPolicyFactoryClassName = compactionPolicyEntity.getClassName();
-        ILSMMergePolicyFactory mergePolicyFactory =
-                (ILSMMergePolicyFactory) Class.forName(compactionPolicyFactoryClassName).newInstance();
-        if (isExternalDataset && mergePolicyFactory.getName().compareTo("correlated-prefix") == 0) {
-            throw new AsterixException("The correlated-prefix merge policy cannot be used with external dataset.");
-        }
-        if (compactionPolicyProperties == null) {
-            if (mergePolicyFactory.getName().compareTo("no-merge") != 0) {
-                throw new AsterixException("Compaction policy properties are missing.");
-            }
-        } else {
-            for (Map.Entry<String, String> entry : compactionPolicyProperties.entrySet()) {
-                if (!mergePolicyFactory.getPropertiesNames().contains(entry.getKey())) {
-                    throw new AsterixException("Invalid compaction policy property: " + entry.getKey());
-                }
-            }
-            for (String p : mergePolicyFactory.getPropertiesNames()) {
-                if (!compactionPolicyProperties.containsKey(p)) {
-                    throw new AsterixException("Missing compaction policy property: " + p);
-                }
-            }
-        }
-    }
-
-    private void handleCreateDatasetStatement(AqlMetadataProvider metadataProvider, Statement stmt,
-            IHyracksClientConnection hcc) throws AsterixException, Exception {
-
-        ProgressState progress = ProgressState.NO_PROGRESS;
-        DatasetDecl dd = (DatasetDecl) stmt;
-        String dataverseName = getActiveDataverse(dd.getDataverse());
-        String datasetName = dd.getName().getValue();
-        DatasetType dsType = dd.getDatasetType();
-        String itemTypeDataverseName = dd.getItemTypeDataverse().getValue();
-        String itemTypeName = dd.getItemTypeName().getValue();
-        String metaItemTypeDataverseName = dd.getMetaItemTypeDataverse().getValue();
-        String metaItemTypeName = dd.getMetaItemTypeName().getValue();
-        Identifier ngNameId = dd.getNodegroupName();
-        String nodegroupName = getNodeGroupName(ngNameId, dd, dataverseName);
-        String compactionPolicy = dd.getCompactionPolicy();
-        Map<String, String> compactionPolicyProperties = dd.getCompactionPolicyProperties();
-        boolean defaultCompactionPolicy = (compactionPolicy == null);
-        boolean temp = dd.getDatasetDetailsDecl().isTemp();
-
-        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-        boolean bActiveTxn = true;
-        metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
-        MetadataLockManager.INSTANCE.createDatasetBegin(dataverseName, itemTypeDataverseName,
-                itemTypeDataverseName + "." + itemTypeName, metaItemTypeDataverseName,
-                metaItemTypeDataverseName + "." + metaItemTypeName, nodegroupName, compactionPolicy,
-                dataverseName + "." + datasetName, defaultCompactionPolicy);
-        Dataset dataset = null;
-        try {
-
-            IDatasetDetails datasetDetails = null;
-            Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
-                    datasetName);
-            if (ds != null) {
-                if (dd.getIfNotExists()) {
-                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                    return;
-                } else {
-                    throw new AlgebricksException("A dataset with this name " + datasetName + " already exists.");
-                }
-            }
-            Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
-                    itemTypeDataverseName, itemTypeName);
-            if (dt == null) {
-                throw new AlgebricksException(": type " + itemTypeName + " could not be found.");
-            }
-            String ngName =
-                    ngNameId != null ? ngNameId.getValue() : configureNodegroupForDataset(dd, dataverseName, mdTxnCtx);
-
-            if (compactionPolicy == null) {
-                compactionPolicy = GlobalConfig.DEFAULT_COMPACTION_POLICY_NAME;
-                compactionPolicyProperties = GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES;
-            } else {
-                validateCompactionPolicy(compactionPolicy, compactionPolicyProperties, mdTxnCtx, false);
-            }
-            switch (dd.getDatasetType()) {
-                case INTERNAL:
-                    IAType itemType = dt.getDatatype();
-                    if (itemType.getTypeTag() != ATypeTag.RECORD) {
-                        throw new AlgebricksException("Dataset type has to be a record type.");
-                    }
-
-                    IAType metaItemType = null;
-                    if (metaItemTypeDataverseName != null && metaItemTypeName != null) {
-                        metaItemType = metadataProvider.findType(metaItemTypeDataverseName, metaItemTypeName);
-                    }
-                    if (metaItemType != null && metaItemType.getTypeTag() != ATypeTag.RECORD) {
-                        throw new AlgebricksException("Dataset meta type has to be a record type.");
-                    }
-                    ARecordType metaRecType = (ARecordType) metaItemType;
-
-                    List<List<String>> partitioningExprs =
-                            ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getPartitioningExprs();
-                    List<Integer> keySourceIndicators =
-                            ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getKeySourceIndicators();
-                    boolean autogenerated = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).isAutogenerated();
-                    ARecordType aRecordType = (ARecordType) itemType;
-                    List<IAType> partitioningTypes = ValidateUtil.validatePartitioningExpressions(aRecordType,
-                            metaRecType, partitioningExprs, keySourceIndicators, autogenerated);
-
-                    List<String> filterField = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getFilterField();
-                    if (filterField != null) {
-                        ValidateUtil.validateFilterField(aRecordType, filterField);
-                    }
-                    if (compactionPolicy == null) {
-                        if (filterField != null) {
-                            // If the dataset has a filter and the user didn't specify a merge
-                            // policy, then we will pick the
-                            // correlated-prefix as the default merge policy.
-                            compactionPolicy = GlobalConfig.DEFAULT_FILTERED_DATASET_COMPACTION_POLICY_NAME;
-                            compactionPolicyProperties = GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES;
-                        }
-                    }
-                    datasetDetails = new InternalDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
-                            InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs,
-                            keySourceIndicators, partitioningTypes, autogenerated, filterField, temp);
-                    break;
-                case EXTERNAL:
-                    String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter();
-                    Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties();
-
-                    datasetDetails = new ExternalDatasetDetails(adapter, properties, new Date(),
-                            ExternalDatasetTransactionState.COMMIT);
-                    break;
-                default:
-                    throw new AsterixException("Unknown datatype " + dd.getDatasetType());
-            }
-
-            // #. initialize DatasetIdFactory if it is not initialized.
-            if (!DatasetIdFactory.isInitialized()) {
-                DatasetIdFactory.initialize(MetadataManager.INSTANCE.getMostRecentDatasetId());
-            }
-
-            // #. add a new dataset with PendingAddOp
-            dataset = new Dataset(dataverseName, datasetName, itemTypeDataverseName, itemTypeName,
-                    metaItemTypeDataverseName, metaItemTypeName, ngName, compactionPolicy, compactionPolicyProperties,
-                    datasetDetails, dd.getHints(), dsType, DatasetIdFactory.generateDatasetId(),
-                    IMetadataEntity.PENDING_ADD_OP);
-            MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset);
-
-            if (dd.getDatasetType() == DatasetType.INTERNAL) {
-                Dataverse dataverse =
-                        MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dataverseName);
-                JobSpecification jobSpec =
-                        DatasetOperations.createDatasetJobSpec(dataverse, datasetName, metadataProvider);
-
-                // #. make metadataTxn commit before calling runJob.
-                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                bActiveTxn = false;
-                progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
-
-                // #. runJob
-                JobUtils.runJob(hcc, jobSpec, true);
-
-                // #. begin new metadataTxn
-                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-                bActiveTxn = true;
-                metadataProvider.setMetadataTxnContext(mdTxnCtx);
-            }
-
-            // #. add a new dataset with PendingNoOp after deleting the dataset with PendingAddOp
-            MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName);
-            dataset.setPendingOp(IMetadataEntity.PENDING_NO_OP);
-            MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset);
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-        } catch (Exception e) {
-            if (bActiveTxn) {
-                abort(e, e, mdTxnCtx);
-            }
-
-            if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
-
-                // #. execute compensation operations
-                // remove the index in NC
-                // [Notice]
-                // As long as we updated(and committed) metadata, we should remove any effect of the job
-                // because an exception occurs during runJob.
-                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-                bActiveTxn = true;
-                metadataProvider.setMetadataTxnContext(mdTxnCtx);
-                CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
-                try {
-                    JobSpecification jobSpec = DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider);
-                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                    bActiveTxn = false;
-                    JobUtils.runJob(hcc, jobSpec, true);
-                } catch (Exception e2) {
-                    e.addSuppressed(e2);
-                    if (bActiveTxn) {
-                        abort(e, e2, mdTxnCtx);
-                    }
-                }
-
-                // remove the record from the metadata.
-                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-                metadataProvider.setMetadataTxnContext(mdTxnCtx);
-                try {
-                    MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
-                            datasetName);
-                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                } catch (Exception e2) {
-                    e.addSuppressed(e2);
-                    abort(e, e2, mdTxnCtx);
-                    throw new IllegalStateException("System is inconsistent state: pending dataset(" + dataverseName
-                            + "." + datasetName + ") couldn't be removed from the metadata", e);
-                }
-            }
-
-            throw e;
-        } finally {
-            MetadataLockManager.INSTANCE.createDatasetEnd(dataverseName, itemTypeDataverseName,
-                    itemTypeDataverseName + "." + itemTypeName, metaItemTypeDataverseName,
-                    metaItemTypeDataverseName + "." + metaItemTypeName, nodegroupName, compactionPolicy,
-                    dataverseName + "." + datasetName, defaultCompactionPolicy);
-        }
-    }
-
-    private void validateIfResourceIsActiveInFeed(String dataverseName, String datasetName) throws AsterixException {
-        StringBuilder builder = null;
-        IActiveEntityEventsListener[] listeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
-        for (IActiveEntityEventsListener listener : listeners) {
-            if (listener instanceof FeedEventsListener
-                    && ((FeedEventsListener) listener).isConnectedToDataset(datasetName)) {
-                if (builder == null) {
-                    builder = new StringBuilder();
-                }
-                builder.append(listener.getEntityId() + "\n");
-            }
-        }
-        if (builder != null) {
-            throw new AsterixException("Dataset " + dataverseName + "." + datasetName + " is currently being "
-                    + "fed into by the following feed(s).\n" + builder.toString() + "\n" + "Operation not supported");
-        }
-    }
-
-    private String getNodeGroupName(Identifier ngNameId, DatasetDecl dd, String dataverse) {
-        if (ngNameId != null) {
-            return ngNameId.getValue();
-        }
-        String hintValue = dd.getHints().get(DatasetNodegroupCardinalityHint.NAME);
-        if (hintValue == null) {
-            return MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME;
-        } else {
-            return dataverse + ":" + dd.getName().getValue();
-        }
-    }
-
-    private String configureNodegroupForDataset(DatasetDecl dd, String dataverse, MetadataTransactionContext mdTxnCtx)
-            throws AsterixException {
-        int nodegroupCardinality;
-        String nodegroupName;
-        String hintValue = dd.getHints().get(DatasetNodegroupCardinalityHint.NAME);
-        if (hintValue == null) {
-            nodegroupName = MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME;
-            return nodegroupName;
-        } else {
-            int numChosen = 0;
-            boolean valid = DatasetHints.validate(DatasetNodegroupCardinalityHint.NAME,
-                    dd.getHints().get(DatasetNodegroupCardinalityHint.NAME)).first;
-            if (!valid) {
-                throw new AsterixException("Incorrect use of hint:" + DatasetNodegroupCardinalityHint.NAME);
-            } else {
-                nodegroupCardinality = Integer.parseInt(dd.getHints().get(DatasetNodegroupCardinalityHint.NAME));
-            }
-            List<String> nodeNames = AsterixAppContextInfo.getInstance().getMetadataProperties().getNodeNames();
-            List<String> nodeNamesClone = new ArrayList<String>(nodeNames);
-            String metadataNodeName = AsterixAppContextInfo.getInstance().getMetadataProperties().getMetadataNodeName();
-            List<String> selectedNodes = new ArrayList<String>();
-            selectedNodes.add(metadataNodeName);
-            numChosen++;
-            nodeNamesClone.remove(metadataNodeName);
-
-            if (numChosen < nodegroupCardinality) {
-                Random random = new Random();
-                String[] nodes = nodeNamesClone.toArray(new String[] {});
-                int[] b = new int[nodeNamesClone.size()];
-                for (int i = 0; i < b.length; i++) {
-                    b[i] = i;
-                }
-
-                for (int i = 0; i < nodegroupCardinality - numChosen; i++) {
-                    int selected = i + random.nextInt(nodeNamesClone.size() - i);
-                    int selNodeIndex = b[selected];
-                    selectedNodes.add(nodes[selNodeIndex]);
-                    int temp = b[0];
-                    b[0] = b[selected];
-                    b[selected] = temp;
-                }
-            }
-            nodegroupName = dataverse + ":" + dd.getName().getValue();
-            MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(nodegroupName, selectedNodes));
-            return nodegroupName;
-        }
-
-    }
-
-    private void handleCreateIndexStatement(AqlMetadataProvider metadataProvider, Statement stmt,
-            IHyracksClientConnection hcc) throws Exception {
-        ProgressState progress = ProgressState.NO_PROGRESS;
-        CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
-        String dataverseName = getActiveDataverse(stmtCreateIndex.getDataverseName());
-        String datasetName = stmtCreateIndex.getDatasetName().getValue();
-        List<Integer> keySourceIndicators = stmtCreateIndex.getFieldSourceIndicators();
-
-        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-        boolean bActiveTxn = true;
-        metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
-        MetadataLockManager.INSTANCE.createIndexBegin(dataverseName, dataverseName + "." + datasetName);
-
-        String indexName = null;
-        JobSpecification spec = null;
-        Dataset ds = null;
-        // For external datasets
-        ArrayList<ExternalFile> externalFilesSnapshot = null;
-        boolean firstExternalDatasetIndex = false;
-        boolean filesIndexReplicated = false;
-        Index filesIndex = null;
-        boolean datasetLocked = false;
-        try {
-            ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
-                    datasetName);
-            if (ds == null) {
-                throw new AlgebricksException(
-                        "There is no dataset with this name " + datasetName + " in dataverse " + dataverseName);
-            }
-
-            indexName = stmtCreateIndex.getIndexName().getValue();
-            Index idx = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
-                    datasetName, indexName);
-            Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
-                    ds.getItemTypeDataverseName(), ds.getItemTypeName());
-            ARecordType aRecordType = (ARecordType) dt.getDatatype();
-            ARecordType metaRecordType = null;
-            if (ds.hasMetaPart()) {
-                Datatype metaDt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
-                        ds.getMetaItemTypeDataverseName(), ds.getMetaItemTypeName());
-                metaRecordType = (ARecordType) metaDt.getDatatype();
-            }
-
-            List<List<String>> indexFields = new ArrayList<List<String>>();
-            List<IAType> indexFieldTypes = new ArrayList<IAType>();
-            int keyIndex = 0;
-            for (Pair<List<String>, TypeExpression> fieldExpr : stmtCreateIndex.getFieldExprs()) {
-                IAType fieldType = null;
-                ARecordType subType =
-                        KeyFieldTypeUtils.chooseSource(keySourceIndicators, keyIndex, aRecordType, metaRecordType);
-                boolean isOpen = subType.isOpen();
-                int i = 0;
-                if (fieldExpr.first.size() > 1 && !isOpen) {
-                    for (; i < fieldExpr.first.size() - 1;) {
-                        subType = (ARecordType) subType.getFieldType(fieldExpr.first.get(i));
-                        i++;
-                        if (subType.isOpen()) {
-                            isOpen = true;
-                            break;
-                        } ;
-                    }
-                }
-                if (fieldExpr.second == null) {
-                    fieldType = subType.getSubFieldType(fieldExpr.first.subList(i, fieldExpr.first.size()));
-                } else {
-                    if (!stmtCreateIndex.isEnforced()) {
-                        throw new AlgebricksException("Cannot create typed index on \"" + fieldExpr.first
-                                + "\" field without enforcing it's type");
-                    }
-                    if (!isOpen) {
-                        throw new AlgebricksException("Typed index on \"" + fieldExpr.first
-                                + "\" field could be created only for open datatype");
-                    }
-                    if (stmtCreateIndex.hasMetaField()) {
-                        throw new AlgebricksException("Typed open index can only be created on the record part");
-                    }
-                    Map<TypeSignature, IAType> typeMap =
-                            TypeTranslator.computeTypes(mdTxnCtx, fieldExpr.second, indexName, dataverseName);
-                    TypeSignature typeSignature = new TypeSignature(dataverseName, indexName);
-                    fieldType = typeMap.get(typeSignature);
-                }
-                if (fieldType == null) {
-                    throw new AlgebricksException(
-                            "Unknown type " + (fieldExpr.second == null ? fieldExpr.first : fieldExpr.second));
-                }
-
-                indexFields.add(fieldExpr.first);
-                indexFieldTypes.add(fieldType);
-                ++keyIndex;
-            }
-
-            ValidateUtil.validateKeyFields(aRecordType, metaRecordType, indexFields, keySourceIndicators,
-                    indexFieldTypes, stmtCreateIndex.getIndexType());
-
-            if (idx != null) {
-                if (stmtCreateIndex.getIfNotExists()) {
-                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                    return;
-                } else {
-                    throw new AlgebricksException("An index with this name " + indexName + " already exists.");
-                }
-            }
-
-            // Checks whether a user is trying to create an inverted secondary index on a dataset
-            // with a variable-length primary key.
-            // Currently, we do not support this. Therefore, as a temporary solution, we print an
-            // error message and stop.
-            if (stmtCreateIndex.getIndexType() == IndexType.SINGLE_PARTITION_WORD_INVIX
-                    || stmtCreateIndex.getIndexType() == IndexType.SINGLE_PARTITION_NGRAM_INVIX
-                    || stmtCreateIndex.getIndexType() == IndexType.LENGTH_PARTITIONED_WORD_INVIX
-                    || stmtCreateIndex.getIndexType() == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) {
-                List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(ds);
-                for (List<String> partitioningKey : partitioningKeys) {
-                    IAType keyType = aRecordType.getSubFieldType(partitioningKey);
-                    ITypeTraits typeTrait = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-
-                    // If it is not a fixed length
-                    if (typeTrait.getFixedLength() < 0) {
-                        throw new AlgebricksException("The keyword or ngram index -" + indexName
-                                + " cannot be created on the dataset -" + datasetName
-                                + " due to its variable-length primary key field - " + partitioningKey);
-                    }
-
-                }
-            }
-
-            if (ds.getDatasetType() == DatasetType.INTERNAL) {
-                validateIfResourceIsActiveInFeed(dataverseName, datasetName);
-            } else {
-                // External dataset
-                // Check if the dataset is indexible
-                if (!ExternalIndexingOperations.isIndexible((ExternalDatasetDetails) ds.getDatasetDetails())) {
-                    throw new AlgebricksException(
-                            "dataset using " + ((ExternalDatasetDetails) ds.getDatasetDetails()).getAdapter()
-                                    + " Adapter can't be indexed");
-                }
-                // Check if the name of the index is valid
-                if (!ExternalIndexingOperations.isValidIndexName(datasetName, indexName)) {
-                    throw new AlgebricksException("external dataset index name is invalid");
-                }
-
-                // Check if the files index exist
-                filesIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
-                        datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName));
-                firstExternalDatasetIndex = (filesIndex == null);
-                // Lock external dataset
-                ExternalDatasetsRegistry.INSTANCE.buildIndexBegin(ds, firstExternalDatasetIndex);
-                datasetLocked = true;
-                if (firstExternalDatasetIndex) {
-                    // Verify that no one has created an index before we acquire the lock
-                    filesIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
-                            dataverseName, datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName));
-                    if (filesIndex != null) {
-                        ExternalDatasetsRegistry.INSTANCE.buildIndexEnd(ds, firstExternalDatasetIndex);
-                        firstExternalDatasetIndex = false;
-                        ExternalDatasetsRegistry.INSTANCE.buildIndexBegin(ds, firstExternalDatasetIndex);
-                    }
-                }
-                if (firstExternalDatasetIndex) {
-                    // Get snapshot from External File System
-                    externalFilesSnapshot = ExternalIndexingOperations.getSnapshotFromExternalFileSystem(ds);
-                    // Add an entry for the files index
-                    filesIndex = new Index(dataverseName, datasetName,
-                            ExternalIndexingOperations.getFilesIndexName(datasetName), IndexType.BTREE,
-                            ExternalIndexingOperations.FILE_INDEX_FIELD_NAMES, null,
-                            ExternalIndexingOperations.FILE_INDEX_FIELD_TYPES, false, false,
-                            IMetadataEntity.PENDING_ADD_OP);
-                    MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), filesIndex);
-                    // Add files to the external files index
-                    for (ExternalFile file : externalFilesSnapshot) {
-                        MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file);
-                    }
-                    // This is the first index for the external dataset, replicate the files index
-                    spec = ExternalIndexingOperations.buildFilesIndexReplicationJobSpec(ds, externalFilesSnapshot,
-                            metadataProvider, true);
-                    if (spec == null) {
-                        throw new AsterixException(
-                                "Failed to create job spec for replicating Files Index For external dataset");
-                    }
-                    filesIndexReplicated = true;
-                    JobUtils.runJob(hcc, spec, true);
-                }
-            }
-
-            // check whether there exists another enforced index on the same field
-            if (stmtCreateIndex.isEnforced()) {
-                List<Index> indexes = MetadataManager.INSTANCE
-                        .getDatasetIndexes(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName);
-                for (Index index : indexes) {
-                    if (index.getKeyFieldNames().equals(indexFields)
-                            && !index.getKeyFieldTypes().equals(indexFieldTypes) && index.isEnforcingKeyFileds()) {
-                        throw new AsterixException("Cannot create index " + indexName + " , enforced index "
-                                + index.getIndexName() + " on field \"" + StringUtils.join(indexFields, ',')
-                                + "\" is already defined with type \"" + index.getKeyFieldTypes() + "\"");
-                    }
-                }
-            }
-
-            // #. add a new index with PendingAddOp
-            Index index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(), indexFields,
-                    keySourceIndicators, indexFieldTypes, stmtCreateIndex.getGramLength(), stmtCreateIndex.isEnforced(),
-                    false, IMetadataEntity.PENDING_ADD_OP);
-            MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
-
-            ARecordType enforcedType = null;
-            if (stmtCreateIndex.isEnforced()) {
-                enforcedType = IntroduceSecondaryIndexInsertDeleteRule.createEnforcedType(aRecordType,
-                        Lists.newArrayList(index));
-            }
-
-            // #. prepare to create the index artifact in NC.
-            CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName,
-                    index.getDatasetName(), index.getKeyFieldNames(), index.getKeyFieldTypes(),
-                    index.isEnforcingKeyFileds(), index.getGramLength(), index.getIndexType());
-            spec = IndexOperations.buildSecondaryIndexCreationJobSpec(cis, aRecordType, metaRecordType,
-                    keySourceIndicators, enforcedType, metadataProvider);
-            if (spec == null) {
-                throw new AsterixException("Failed to create job spec for creating index '"
-                        + stmtCreateIndex.getDatasetName() + "." + stmtCreateIndex.getIndexName() + "'");
-            }
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-            bActiveTxn = false;
-
-            progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
-
-            // #. create the index artifact in NC.
-            JobUtils.runJob(hcc, spec, true);
-
-            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-            bActiveTxn = true;
-            metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
-            // #. load data into the index in NC.
-            cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName, index.getDatasetName(),
-                    index.getKeyFieldNames(), index.getKeyFieldTypes(), index.isEnforcingKeyFileds(),
-                    index.getGramLength(), index.getIndexType());
-
-            spec = IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, aRecordType, metaRecordType,
-                    keySourceIndicators, enforcedType, metadataProvider);
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-            bActiveTxn = false;
-
-            JobUtils.runJob(hcc, spec, true);
-
-            // #. begin new metadataTxn
-            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-            bActiveTxn = true;
-            metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
-            // #. add another new index with PendingNoOp after deleting the index with PendingAddOp
-            MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
-                    indexName);
-            index.setPendingOp(IMetadataEntity.PENDING_NO_OP);
-            MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
-            // add another new files index with PendingNoOp after deleting the index with
-            // PendingAddOp
-            if (firstExternalDatasetIndex) {
-                MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
-                        filesIndex.getIndexName());
-                filesIndex.setPendingOp(IMetadataEntity.PENDING_NO_OP);
-                MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), filesIndex);
-                // update transaction timestamp
-                ((ExternalDatasetDetails) ds.getDatasetDetails()).setRefreshTimestamp(new Date());
-                MetadataManager.INSTANCE.updateDataset(mdTxnCtx, ds);
-            }
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-
-        } catch (Exception e) {
-            if (bActiveTxn) {
-                abort(e, e, mdTxnCtx);
-            }
-            // If files index was replicated for external dataset, it should be cleaned up on NC side
-            if (filesIndexReplicated) {
-                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-                bActiveTxn = true;
-                CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
-                        ExternalIndexingOperations.getFilesIndexName(datasetName));
-                try {
-                    JobSpecification jobSpec =
-                            ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds, metadataProvider, ds);
-                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                    bActiveTxn = false;
-                    JobUtils.runJob(hcc, jobSpec, true);
-                } catch (Exception e2) {
-                    e.addSuppressed(e2);
-                    if (bActiveTxn) {
-                        abort(e, e2, mdTxnCtx);
-                    }
-                }
-            }
-
-            if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
-                // #. execute compensation operations
-                // remove the index in NC
-                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-                bActiveTxn = true;
-                metadataProvider.setMetadataTxnContext(mdTxnCtx);
-                CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
-                try {
-                    JobSpecification jobSpec =
-                            IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds);
-
-                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                    bActiveTxn = false;
-                    JobUtils.runJob(hcc, jobSpec, true);
-                } catch (Exception e2) {
-                    e.addSuppressed(e2);
-                    if (bActiveTxn) {
-                        abort(e, e2, mdTxnCtx);
-                    }
-                }
-
-                if (firstExternalDatasetIndex) {
-                    mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-                    metadataProvider.setMetadataTxnContext(mdTxnCtx);
-                    try {
-                        // Drop External Files from metadata
-                        MetadataManager.INSTANCE.dropDatasetExternalFiles(mdTxnCtx, ds);
-                        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                    } catch (Exception e2) {
-                        e.addSuppressed(e2);
-                        abort(e, e2, mdTxnCtx);
-                        throw new IllegalStateException("System is inconsistent state: pending files for("
-                                + dataverseName + "." + datasetName + ") couldn't be removed from the metadata", e);
-                    }
-                    mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-                    metadataProvider.setMetadataTxnContext(mdTxnCtx);
-                    try {
-                        // Drop the files index from metadata
-                        MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
-                                datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName));
-                        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                    } catch (Exception e2) {
-                        e.addSuppressed(e2);
-                        abort(e, e2, mdTxnCtx);
-                        throw new IllegalStateException("System is inconsistent state: pending index(" + dataverseName
-                                + "." + datasetName + "." + ExternalIndexingOperations.getFilesIndexName(datasetName)
-                                + ") couldn't be removed from the metadata", e);
-                    }
-                }
-                // remove the record from the metadata.
-                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-                metadataProvider.setMetadataTxnContext(mdTxnCtx);
-                try {
-                    MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
-                            datasetName, indexName);
-                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                } catch (Exception e2) {
-                    e.addSuppressed(e2);
-                    abort(e, e2, mdTxnCtx);
-                    throw new IllegalStateException("System is in inconsistent state: pending index(" + dataverseName
-                            + "." + datasetName + "." + indexName + ") couldn't be removed from the metadata", e);
-                }
-            }
-            throw e;
-        } finally {
-            MetadataLockManager.INSTANCE.createIndexEnd(dataverseName, dataverseName + "." + datasetName);
-            if (datasetLocked) {
-                ExternalDatasetsRegistry.INSTANCE.buildIndexEnd(ds, firstExternalDatasetIndex);
-            }
-        }
-    }
-
-    private void handleCreateTypeStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
-        TypeDecl stmtCreateType = (TypeDecl) stmt;
-        String dataverseName = getActiveDataverse(stmtCreateType.getDataverseName());
-        String typeName = stmtCreateType.getIdent().getValue();
-        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-        metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        MetadataLockManager.INSTANCE.createTypeBegin(dataverseName, dataverseName + "." + typeName);
-        try {
-
-            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
-            if (dv == null) {
-                throw new AlgebricksException("Unknown dataverse " + dataverseName);
-            }
-            Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
-            if (dt != null) {
-                if (!stmtCreateType.getIfNotExists()) {
-                    throw new AlgebricksException("A datatype with this name " + typeName + " already exists.");
-                }
-            } else {
-                if (builtinTypeMap.get(typeName) != null) {
-                    throw new AlgebricksException("Cannot redefine builtin type " + typeName + ".");
-                } else {
-                    Map<TypeSignature, IAType> typeMap = TypeTranslator.computeTypes(mdTxnCtx,
-                            stmtCreateType.getTypeDef(), stmtCreateType.getIdent().getValue(), dataverseName);
-                    TypeSignature typeSignature = new TypeSignature(dataverseName, typeName);
-                    IAType type = typeMap.get(typeSignature);
-                    MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new Datatype(dataverseName, typeName, type, false));
-                }
-            }
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-        } catch (Exception e) {
-            abort(e, e, mdTxnCtx);
-            throw e;
-        } finally {
-            MetadataLockManager.INSTANCE.createTypeEnd(dataverseName, dataverseName + "." + typeName);
-        }
-    }
-
-    private void handleDataverseDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
-            IHyracksClientConnection hcc) throws Exception {
-        DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt;
-        String dataverseName = stmtDelete.getDataverseName().getValue();
-
-        ProgressState progress = ProgressState.NO_PROGRESS;
-        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-        boolean bActiveTxn = true;
-        metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        MetadataLockManager.INSTANCE.acquireDataverseWriteLock(dataverseName);
-        List<JobSpecification> jobsToExecute = new ArrayList<JobSpecification>();
-        try {
-            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
-            if (dv == null) {
-                if (stmtDelete.getIfExists()) {
-                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                    return;
-                } else {
-                    throw new AlgebricksException("There is no dataverse with this name " + dataverseName + ".");
-                }
-            }
-            // # disconnect all feeds from any datasets in the dataverse.
-            IActiveEntityEventsListener[] activeListeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
-            Identifier dvId = new Identifier(dataverseName);
-            for (IActiveEntityEventsListener listener : activeListeners) {
-                EntityId activeEntityId = listener.getEntityId();
-                if (activeEntityId.getExtensionName().equals(Feed.EXTENSION_NAME)
-                        && activeEntityId.getDataverse().equals(dataverseName)) {
-                    FeedEventsListener feedEventListener = (FeedEventsListener) listener;
-                    FeedConnectionId[] connections = feedEventListener.getConnections();
-                    for (FeedConnectionId conn : connections) {
-                        disconnectFeedBeforeDelete(dvId, activeEntityId, conn, metadataProvider, hcc);
-                    }
-                    // prepare job to remove feed log storage
-                    jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob(
-                            MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverseName, activeEntityId.getEntityName())));
-                }
-            }
-
-            // #. prepare jobs which will drop corresponding datasets with indexes.
-            List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dataverseName);
-            for (int j = 0; j < datasets.size(); j++) {
-                String datasetName = datasets.get(j).getDatasetName();
-                DatasetType dsType = datasets.get(j).getDatasetType();
-                if (dsType == DatasetType.INTERNAL) {
-                    List<Index> indexes =
-                            MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
-                    for (int k = 0; k < indexes.size(); k++) {
-                        if (indexes.get(k).isSecondaryIndex()) {
-                            CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
-                                    indexes.get(k).getIndexName());
-                            jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider,
-                                    datasets.get(j)));
-                        }
-                    }
-
-                    CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
-                    jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
-                } else {
-                    // External dataset
-                    List<Index> indexes =
-                            MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
-                    for (int k = 0; k < indexes.size(); k++) {
-                        if (ExternalIndexingOperations.isFileIndex(indexes.get(k))) {
-                            CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
-                                    indexes.get(k).getIndexName());
-                            jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds,
-                                    metadataProvider, datasets.get(j)));
-                        } else {
-                            CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
-                                    indexes.get(k).getIndexName());
-                            jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider,
-                                    datasets.get(j)));
-                        }
-                    }
-                    ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(datasets.get(j));
-                }
-            }
-            jobsToExecute.add(DataverseOperations.createDropDataverseJobSpec(dv, metadataProvider));
-            // #. mark PendingDropOp on the dataverse record by
-            // first, deleting the dataverse record from the DATAVERSE_DATASET
-            // second, inserting the dataverse record with the PendingDropOp value into the
-            // DATAVERSE_DATASET
-            MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
-            MetadataManager.INSTANCE.addDataverse(mdTxnCtx,
-                    new Dataverse(dataverseName, dv.getDataFormat(), IMetadataEntity.PENDING_DROP_OP));
-
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-            bActiveTxn = false;
-            progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
-
-            for (JobSpecification jobSpec : jobsToExecute) {
-                JobUtils.runJob(hcc, jobSpec, true);
-            }
-
-            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-            bActiveTxn = true;
-            metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
-            // #. finally, delete the dataverse.
-            MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
-            if (activeDefaultDataverse != null && activeDefaultDataverse.getDataverseName() == dataverseName) {
-                activeDefaultDataverse = null;
-            }
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-        } catch (Exception e) {
-            if (bActiveTxn) {
-                abort(e, e, mdTxnCtx);
-            }
-
-            if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
-                if (activeDefaultDataverse != null && activeDefaultDataverse.getDataverseName() == dataverseName) {
-                    activeDefaultDataverse = null;
-                }
-
-                // #. execute compensation operations
-                // remove the all indexes in NC
-                try {
-                    for (JobSpecification jobSpec : jobsToExecute) {
-                        JobUtils.runJob(hcc, jobSpec, true);
-                    }
-                } catch (Exception e2) {
-                    // do no throw exception since still the metadata needs to be compensated.
-                    e.addSuppressed(e2);
-                }
-
-                // remove the record from the metadata.
-                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-                try {
-                    MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
-                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                } catch (Exception e2) {
-                    e.addSuppressed(e2);
-                    abort(e, e2, mdTxnCtx);
-                    throw new IllegalStateException("System is inconsistent state: pending dataverse(" + dataverseName
-                            + ") couldn't be removed from the metadata", e);
-                }
-            }
-
-            throw e;
-        } finally {
-            MetadataLockManager.INSTANCE.releaseDataverseWriteLock(dataverseName);
-        }
-    }
-
-    private void disconnectFeedBeforeDelete(Identifier dvId, EntityId activeEntityId, FeedConnectionId conn,
-            AqlMetadataProvider metadataProvider, IHyracksClientConnection hcc) {
-        DisconnectFeedStatement disStmt = new DisconnectFeedStatement(dvId,
-                new Identifier(activeEntityId.getEntityName()), new Identifier(conn.getDatasetName()));
-        try {
-            handleDisconnectFeedStatement(metadataProvider, disStmt, hcc);
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Disconnected feed " + activeEntityId.getEntityName() + " from dataset "
-                        + conn.getDatasetName());
-            }
-        } catch (Exception exception) {
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Unable to disconnect feed " + activeEntityId.getEntityName() + " from dataset "
-                        + conn.getDatasetName() + ". Encountered exception " + exception);
-            }
-        }
-    }
-
-    private void handleDatasetDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
-            IHyracksClientConnection hcc) throws Exception {
-        DropStatement stmtDelete = (DropStatement) stmt;
-        String dataverseName = getActiveDataverse(stmtDelete.getDataverseName());
-        String datasetName = stmtDelete.getDatasetName().getValue();
-
-        ProgressState progress = ProgressState.NO_PROGRESS;
-        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-        boolean bActiveTxn = true;
-        metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
-        MetadataLockManager.INSTANCE.dropDatasetBegin(dataverseName, dataverseName + "." + datasetName);
-        List<JobSpecification> jobsToExecute = new ArrayList<JobSpecification>();
-        try {
-
-            Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
-            if (ds == null) {
-                if (stmtDelete.getIfExists()) {
-                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                    return;
-                } else {
-                    throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
-                            + dataverseName + ".");
-                }
-            }
-
-            Map<FeedConnectionId, Pair<JobSpecification, Boolean>> disconnectJobList = new HashMap<>();
-            if (ds.getDatasetType() == DatasetType.INTERNAL) {
-                // prepare job spec(s) that would disconnect any active feeds involving the dataset.
-                IActiveEntityEventsListener[] feedConnections =
-                        ActiveJobNotificationHandler.INSTANCE.getEventListeners();
-                for (IActiveEntityEventsListener conn : feedConnections) {
-                    if (conn.getEntityId().getExtensionName().equals(Feed.EXTENSION_NAME)
-                            && ((FeedEventsListener) conn).isConnectedToDataset(datasetName)) {
-                        FeedConnectionId connectionId = new FeedConnectionId(conn.getEntityId(), datasetName);
-                        Pair<JobSpecification, Boolean> p =
-                                FeedOperations.buildDisconnectFeedJobSpec(metadataProvider, connectionId);
-                        disconnectJobList.put(connectionId, p);
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("Disconnecting feed " + connectionId.getFeedId().getEntityName()
-                                    + " from dataset " + datasetName + " as dataset is being dropped");
-                        }
-                        // prepare job to remove feed log storage
-                        jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob(
-                                MetadataManager.INSTANCE.getFeed(mdTxnCtx, connectionId.getFeedId().getDataverse(),
-                                        connectionId.getFeedId().getEntityName())));
-                    }
-                }
-
-                // #. prepare jobs to drop the datatset and the indexes in NC
-                List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
-                for (int j = 0; j < indexes.size(); j++) {
-                    if (indexes.get(j).isSecondaryIndex()) {
-                        CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
-                                indexes.get(j).getIndexName());
-                        jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds));
-                    }
-                }
-                CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
-                jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
-
-                // #. mark the existing dataset as PendingDropOp
-                MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
-                MetadataManager.INSTANCE.addDataset(mdTxnCtx,
-                        new Dataset(dataverseName, datasetName, ds.getItemTypeDataverseName(), ds.getItemTypeName(),
-                                ds.getMetaItemTypeDataverseName(), ds.getMetaItemTypeName(), ds.getNodeGroupName(),
-                                ds.getCompactionPolicy(), ds.getCompactionPolicyProperties(), ds.getDatasetDetails(),
-                                ds.getHints(), ds.getDatasetType(), ds.getDatasetId(),
-                                IMetadataEntity.PENDING_DROP_OP));
-
-                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                bActiveTxn = false;
-                progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
-
-                // # disconnect the feeds
-                for (Pair<JobSpecification, Boolean> p : disconnectJobList.values()) {
-                    JobUtils.runJob(hcc, p.first, true);
-                }
-
-                // #. run the jobs
-                for (JobSpecification jobSpec : jobsToExecute) {
-                    JobUtils.runJob(hcc, jobSpec, true);
-                }
-
-                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-                bActiveTxn = true;
-                metadataProvider.setMetadataTxnContext(mdTxnCtx);
-            } else {
-                // External dataset
-                ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(ds);
-                // #. prepare jobs to drop the datatset and the indexes in NC
-                List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
-                for (int j = 0; j < indexes.size(); j++) {
-                    if (ExternalIndexingOperations.isFileIndex(indexes.get(j))) {
-                        CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
-                                indexes.get(j).getIndexName());
-                        jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds));
-                    } else {
-                        CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
-                                indexes.get(j).getIndexName());
-                        jobsToExecute
-                                .add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds, metadataProvider, ds));
-                    }
-                }
-
-                // #. mark the existing dataset as PendingDropOp
-                MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
-                MetadataManager.INSTANCE.addDataset(mdTxnCtx,
-                        new Dataset(dataverseName, datasetName, ds.getItemTypeDataverseName(), ds.getItemTypeName(),
-                                ds.getNodeGroupName(), ds.getCompactionPolicy(), ds.getCompactionPolicyProperties(),
-                                ds.getDatasetDetails(), ds.getHints(), ds.getDatasetType(), ds.getDatasetId(),
-                                IMetadataEntity.PENDING_DROP_OP));
-
-                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                bActiveTxn = false;
-                progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
-
-                // #. run the jobs
-                for (JobSpecification jobSpec : jobsToExecute) {
-                    JobUtils.runJob(hcc, jobSpec, true);
-                }
-                if (indexes.size() > 0) {
-                    ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(ds);
-                }
-                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-                bActiveTxn = true;
-                metadataProvider.setMetadataTxnContext(mdTxnCtx);
-            }
-
-            // #. finally, delete the dataset.
-            MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
-            // Drop the associated nodegroup
-            String nodegroup = ds.getNodeGroupName();
-            if (!nodegroup.equalsIgnoreCase(MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME)) {
-                MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, dataverseName + ":" + datasetName);
-            }
-
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-        } catch (Exception e) {
-            if (bActiveTxn) {
-                abort(e, e, mdTxnCtx);
-            }
-
-            if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
-                // #. execute compensation operations
-                // remove the all indexes in NC
-                try {
-                    for (JobSpecification jobSpec : jobsToExecute) {
-                        JobUtils.runJob(hcc, jobSpec, true);
-                    }
-                } catch (Exception e2) {
-                    // do no throw exception since still the metadata needs to be compensated.
-                    e.addSuppressed(e2);
-                }
-
-                // remove the record from the metadata.
-                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-                metadataProvider.setMetadataTxnContext(mdTxnCtx);
-                try {
-                    MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
-                            datasetName);
-                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                } catch (Exception e2) {
-                    e.addSuppressed(e2);
-                    abort(e, e2, mdTxnCtx);
-                    throw new IllegalStateException("System is inconsistent state: pending dataset(" + dataverseName
-                            + "." + datasetName + ") couldn't be removed from the metadata", e);
-                }
-            }
-
-            throw e;
-        } finally {
-            MetadataLockManager.INSTANCE.dropDatasetEnd(dataverseName, dataverseName + "." + datasetName);
-        }
-    }
-
-    private void handleIndexDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
-            IHyracksClientConnection hcc) throws Exception {
-
-        IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
-        String datasetName = stmtIndexDrop.getDatasetName().getValue();
-        String dataverseName = getActiveDataverse(stmtIndexDrop.getDataverseName());
-        ProgressState progress = ProgressState.NO_PROGRESS;
-        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-        boolean bActiveTxn = true;
-        metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
-        MetadataLockManager.INSTANCE.dropIndexBegin(dataverseName, dataverseName + "." + datasetName);
-
-        String indexName = null;
-        // For external index
-        boolean dropFilesIndex = false;
-        List<JobSpecification> jobsToExecute = new ArrayList<JobSpecification>();
-        try {
-
-            Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
-            if (ds == null) {
-                throw new AlgebricksException(
-                        "There is no dataset with this name " + datasetName + " in dataverse " + dataverseName);
-            }
-            IActiveEntityEventsListener[] listeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
-            StringBuilder builder = null;
-            for (IActiveEntityEventsListener listener : listeners) {
-                if (listener.getEntityId().getExtensionName().equals(Feed.EXTENSION_NAME)
-                        && ((FeedEventsListener) listener).isConnectedToDataset(datasetName)) {
-                    if (builder == null) {
-                        builder = new StringBuilder();
-                    }
-                    builder.append(new FeedConnectionId(listener.getEntityId(), datasetName) + "\n");
-                }
-            }
-            if (builder != null) {
-                throw new AsterixException(
-                        "Dataset" + datasetName + " is currently being fed into by the following feeds " + "."
-                                + builder.toString() + "\nOperation not supported.");
-            }
-
-            if (ds.getDatasetType() == DatasetType.INTERNAL) {
-                indexName = stmtIndexDrop.getIndexName().getValue();
-                Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
-                if (index == null) {
-                    if (stmtIndexDrop.getIfExists()) {
-                        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                        return;
-                    } else {
-                        throw new AlgebricksException("There is no index with this name " + indexName + ".");
-                    }
-                }
-                // #. prepare a job to drop the index in NC.
-                CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
-                jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds));
-
-                // #. mark PendingDropOp on the existing index
-                MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
-                MetadataManager.INSTANCE.addIndex(mdTxnCtx,
-                        new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getKeyFieldNames(),
-                                index.getKeyFieldSourceIndicators(), index.getKeyFieldTypes(),
-                                index.isEnforcingKeyFileds(), index.isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
-
-                // #. commit the existing transaction before calling runJob.
-                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                bActiveTxn = false;
-     

<TRUNCATED>

Mime
View raw message