asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From buyin...@apache.org
Subject [11/16] asterixdb git commit: Add Asterix Extension Manager
Date Sat, 20 Aug 2016 06:15:53 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
new file mode 100644
index 0000000..24c678c
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -0,0 +1,3122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.translator;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.rmi.RemoteException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Random;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.active.IActiveEntityEventsListener;
+import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.api.common.APIFramework;
+import org.apache.asterix.api.http.servlet.APIServlet;
+import org.apache.asterix.app.cc.CompilerExtensionManager;
+import org.apache.asterix.app.external.ExternalIndexingOperations;
+import org.apache.asterix.app.external.FeedJoint;
+import org.apache.asterix.app.external.FeedOperations;
+import org.apache.asterix.app.result.ResultReader;
+import org.apache.asterix.app.result.ResultUtil;
+import org.apache.asterix.common.app.SessionConfig;
+import org.apache.asterix.common.config.AsterixExternalProperties;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState;
+import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
+import org.apache.asterix.common.config.DatasetConfig.IndexType;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.feed.api.IFeed;
+import org.apache.asterix.external.feed.api.IFeed.FeedType;
+import org.apache.asterix.external.feed.api.IFeedJoint;
+import org.apache.asterix.external.feed.api.IFeedJoint.FeedJointType;
+import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber;
+import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber.FeedLifecycleEvent;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.management.FeedConnectionRequest;
+import org.apache.asterix.external.feed.management.FeedEventsListener;
+import org.apache.asterix.external.feed.management.FeedJointKey;
+import org.apache.asterix.external.feed.management.FeedLifecycleEventSubscriber;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.feed.watch.FeedActivity.FeedActivityDetails;
+import org.apache.asterix.external.feed.watch.FeedConnectJobInfo;
+import org.apache.asterix.external.feed.watch.FeedIntakeInfo;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
+import org.apache.asterix.file.DatasetOperations;
+import org.apache.asterix.file.DataverseOperations;
+import org.apache.asterix.file.IndexOperations;
+import org.apache.asterix.formats.nontagged.AqlTypeTraitProvider;
+import org.apache.asterix.lang.aql.statement.SubscribeFeedStatement;
+import org.apache.asterix.lang.common.base.IRewriterFactory;
+import org.apache.asterix.lang.common.base.IStatementRewriter;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.expression.TypeExpression;
+import org.apache.asterix.lang.common.statement.CompactStatement;
+import org.apache.asterix.lang.common.statement.ConnectFeedStatement;
+import org.apache.asterix.lang.common.statement.CreateDataverseStatement;
+import org.apache.asterix.lang.common.statement.CreateFeedPolicyStatement;
+import org.apache.asterix.lang.common.statement.CreateFeedStatement;
+import org.apache.asterix.lang.common.statement.CreateFunctionStatement;
+import org.apache.asterix.lang.common.statement.CreateIndexStatement;
+import org.apache.asterix.lang.common.statement.CreatePrimaryFeedStatement;
+import org.apache.asterix.lang.common.statement.CreateSecondaryFeedStatement;
+import org.apache.asterix.lang.common.statement.DatasetDecl;
+import org.apache.asterix.lang.common.statement.DataverseDecl;
+import org.apache.asterix.lang.common.statement.DataverseDropStatement;
+import org.apache.asterix.lang.common.statement.DeleteStatement;
+import org.apache.asterix.lang.common.statement.DisconnectFeedStatement;
+import org.apache.asterix.lang.common.statement.DropDatasetStatement;
+import org.apache.asterix.lang.common.statement.ExternalDetailsDecl;
+import org.apache.asterix.lang.common.statement.FeedDropStatement;
+import org.apache.asterix.lang.common.statement.FeedPolicyDropStatement;
+import org.apache.asterix.lang.common.statement.FunctionDecl;
+import org.apache.asterix.lang.common.statement.FunctionDropStatement;
+import org.apache.asterix.lang.common.statement.IDatasetDetailsDecl;
+import org.apache.asterix.lang.common.statement.IndexDropStatement;
+import org.apache.asterix.lang.common.statement.InsertStatement;
+import org.apache.asterix.lang.common.statement.InternalDetailsDecl;
+import org.apache.asterix.lang.common.statement.LoadStatement;
+import org.apache.asterix.lang.common.statement.NodeGroupDropStatement;
+import org.apache.asterix.lang.common.statement.NodegroupDecl;
+import org.apache.asterix.lang.common.statement.Query;
+import org.apache.asterix.lang.common.statement.RefreshExternalDatasetStatement;
+import org.apache.asterix.lang.common.statement.RunStatement;
+import org.apache.asterix.lang.common.statement.SetStatement;
+import org.apache.asterix.lang.common.statement.TypeDecl;
+import org.apache.asterix.lang.common.statement.TypeDropStatement;
+import org.apache.asterix.lang.common.statement.WriteStatement;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.util.FunctionUtil;
+import org.apache.asterix.metadata.IDatasetDetails;
+import org.apache.asterix.metadata.MetadataException;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.api.IMetadataEntity;
+import org.apache.asterix.metadata.dataset.hints.DatasetHints;
+import org.apache.asterix.metadata.dataset.hints.DatasetHints.DatasetNodegroupCardinalityHint;
+import org.apache.asterix.metadata.declared.AqlMetadataProvider;
+import org.apache.asterix.metadata.entities.CompactionPolicy;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Datatype;
+import org.apache.asterix.metadata.entities.Dataverse;
+import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
+import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.metadata.entities.FeedPolicyEntity;
+import org.apache.asterix.metadata.entities.Function;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.metadata.entities.NodeGroup;
+import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
+import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
+import org.apache.asterix.metadata.utils.DatasetUtils;
+import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
+import org.apache.asterix.metadata.utils.KeyFieldTypeUtils;
+import org.apache.asterix.metadata.utils.MetadataConstants;
+import org.apache.asterix.metadata.utils.MetadataLockManager;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.TypeSignature;
+import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.optimizer.rules.IntroduceSecondaryIndexInsertDeleteRule;
+import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
+import org.apache.asterix.translator.AbstractLangTranslator;
+import org.apache.asterix.translator.CompiledStatements.CompiledConnectFeedStatement;
+import org.apache.asterix.translator.CompiledStatements.CompiledCreateIndexStatement;
+import org.apache.asterix.translator.CompiledStatements.CompiledDatasetDropStatement;
+import org.apache.asterix.translator.CompiledStatements.CompiledDeleteStatement;
+import org.apache.asterix.translator.CompiledStatements.CompiledIndexCompactStatement;
+import org.apache.asterix.translator.CompiledStatements.CompiledIndexDropStatement;
+import org.apache.asterix.translator.CompiledStatements.CompiledInsertStatement;
+import org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
+import org.apache.asterix.translator.CompiledStatements.CompiledSubscribeFeedStatement;
+import org.apache.asterix.translator.CompiledStatements.CompiledUpsertStatement;
+import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.TypeTranslator;
+import org.apache.asterix.translator.util.ValidateUtil;
+import org.apache.asterix.util.FlushDatasetUtils;
+import org.apache.asterix.util.JobUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.common.utils.Triple;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind;
+import org.apache.hyracks.algebricks.data.IAWriterFactory;
+import org.apache.hyracks.algebricks.data.IResultSerializerFactoryProvider;
+import org.apache.hyracks.algebricks.runtime.serializer.ResultSerializerFactoryProvider;
+import org.apache.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import com.google.common.collect.Lists;
+
+/*
+ * Provides functionality for executing a batch of Query statements (queries included)
+ * sequentially.
+ */
+public class QueryTranslator extends AbstractLangTranslator implements IStatementExecutor {
+
+    private static final Logger LOGGER = Logger.getLogger(QueryTranslator.class.getName());
+
+    protected enum ProgressState {
+        NO_PROGRESS,
+        ADDED_PENDINGOP_RECORD_TO_METADATA
+    }
+
+    public static final boolean IS_DEBUG_MODE = false;// true
+    protected final List<Statement> statements;
+    protected final SessionConfig sessionConfig;
+    protected Dataverse activeDefaultDataverse;
+    protected final List<FunctionDecl> declaredFunctions;
+    protected final APIFramework apiFramework;
+    protected final IRewriterFactory rewriterFactory;
+
+    public QueryTranslator(List<Statement> aqlStatements, SessionConfig conf,
+            ILangCompilationProvider compliationProvider, CompilerExtensionManager ccExtensionManager) {
+        this.statements = aqlStatements;
+        this.sessionConfig = conf;
+        this.declaredFunctions = getDeclaredFunctions(aqlStatements);
+        this.apiFramework = new APIFramework(compliationProvider, ccExtensionManager);
+        this.rewriterFactory = compliationProvider.getRewriterFactory();
+    }
+
+    protected List<FunctionDecl> getDeclaredFunctions(List<Statement> statements) {
+        List<FunctionDecl> functionDecls = new ArrayList<>();
+        for (Statement st : statements) {
+            if (st.getKind() == Statement.Kind.FUNCTION_DECL) {
+                functionDecls.add((FunctionDecl) st);
+            }
+        }
+        return functionDecls;
+    }
+
+    /**
+     * Compiles and submits for execution a list of AQL statements.
+     *
+     * @param hcc
+     *            A Hyracks client connection that is used to submit a jobspec to Hyracks.
+     * @param hdc
+     *            A Hyracks dataset client object that is used to read the results.
+     * @param resultDelivery
+     *            True if the results should be read asynchronously or false if we should wait for results to be read.
+     * @return A List<QueryResult> containing a QueryResult instance corresponding to each submitted query.
+     * @throws Exception
+     */
+    @Override
+    public void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery)
+            throws Exception {
+        compileAndExecute(hcc, hdc, resultDelivery, new Stats());
+    }
+
+    @Override
+    public void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery,
+            Stats stats) throws Exception {
+        int resultSetIdCounter = 0;
+        FileSplit outputFile = null;
+        IAWriterFactory writerFactory = PrinterBasedWriterFactory.INSTANCE;
+        IResultSerializerFactoryProvider resultSerializerFactoryProvider = ResultSerializerFactoryProvider.INSTANCE;
+        Map<String, String> config = new HashMap<>();
+        /* Since the system runs a large number of threads, when HTTP requests don't return, it becomes difficult to
+         * find the thread running the request to determine where it has stopped.
+         * Setting the thread name helps make that easier
+         */
+        String threadName = Thread.currentThread().getName();
+        Thread.currentThread().setName(QueryTranslator.class.getSimpleName());
+        try {
+            for (Statement stmt : statements) {
+                if (sessionConfig.is(SessionConfig.FORMAT_HTML)) {
+                    sessionConfig.out().println(APIServlet.HTML_STATEMENT_SEPARATOR);
+                }
+                validateOperation(activeDefaultDataverse, stmt);
+                rewriteStatement(stmt); // Rewrite the statement's AST.
+                AqlMetadataProvider metadataProvider = new AqlMetadataProvider(activeDefaultDataverse);
+                metadataProvider.setWriterFactory(writerFactory);
+                metadataProvider.setResultSerializerFactoryProvider(resultSerializerFactoryProvider);
+                metadataProvider.setOutputFile(outputFile);
+                metadataProvider.setConfig(config);
+                switch (stmt.getKind()) {
+                    case Statement.Kind.SET:
+                        handleSetStatement(stmt, config);
+                        break;
+                    case Statement.Kind.DATAVERSE_DECL:
+                        activeDefaultDataverse = handleUseDataverseStatement(metadataProvider, stmt);
+                        break;
+                    case Statement.Kind.CREATE_DATAVERSE:
+                        handleCreateDataverseStatement(metadataProvider, stmt);
+                        break;
+                    case Statement.Kind.DATASET_DECL:
+                        handleCreateDatasetStatement(metadataProvider, stmt, hcc);
+                        break;
+                    case Statement.Kind.CREATE_INDEX:
+                        handleCreateIndexStatement(metadataProvider, stmt, hcc);
+                        break;
+                    case Statement.Kind.TYPE_DECL:
+                        handleCreateTypeStatement(metadataProvider, stmt);
+                        break;
+                    case Statement.Kind.NODEGROUP_DECL:
+                        handleCreateNodeGroupStatement(metadataProvider, stmt);
+                        break;
+                    case Statement.Kind.DATAVERSE_DROP:
+                        handleDataverseDropStatement(metadataProvider, stmt, hcc);
+                        break;
+                    case Statement.Kind.DATASET_DROP:
+                        handleDatasetDropStatement(metadataProvider, stmt, hcc);
+                        break;
+                    case Statement.Kind.INDEX_DROP:
+                        handleIndexDropStatement(metadataProvider, stmt, hcc);
+                        break;
+                    case Statement.Kind.TYPE_DROP:
+                        handleTypeDropStatement(metadataProvider, stmt);
+                        break;
+                    case Statement.Kind.NODEGROUP_DROP:
+                        handleNodegroupDropStatement(metadataProvider, stmt);
+                        break;
+                    case Statement.Kind.CREATE_FUNCTION:
+                        handleCreateFunctionStatement(metadataProvider, stmt);
+                        break;
+                    case Statement.Kind.FUNCTION_DROP:
+                        handleFunctionDropStatement(metadataProvider, stmt);
+                        break;
+                    case Statement.Kind.LOAD:
+                        handleLoadStatement(metadataProvider, stmt, hcc);
+                        break;
+                    case Statement.Kind.INSERT:
+                    case Statement.Kind.UPSERT:
+                        handleInsertUpsertStatement(metadataProvider, stmt, hcc);
+                        break;
+                    case Statement.Kind.DELETE:
+                        handleDeleteStatement(metadataProvider, stmt, hcc);
+                        break;
+                    case Statement.Kind.CREATE_PRIMARY_FEED:
+                    case Statement.Kind.CREATE_SECONDARY_FEED:
+                        handleCreateFeedStatement(metadataProvider, stmt);
+                        break;
+                    case Statement.Kind.DROP_FEED:
+                        handleDropFeedStatement(metadataProvider, stmt, hcc);
+                        break;
+                    case Statement.Kind.DROP_FEED_POLICY:
+                        handleDropFeedPolicyStatement(metadataProvider, stmt);
+                        break;
+                    case Statement.Kind.CONNECT_FEED:
+                        handleConnectFeedStatement(metadataProvider, stmt, hcc);
+                        break;
+                    case Statement.Kind.DISCONNECT_FEED:
+                        handleDisconnectFeedStatement(metadataProvider, stmt, hcc);
+                        break;
+                    case Statement.Kind.SUBSCRIBE_FEED:
+                        handleSubscribeFeedStatement(metadataProvider, stmt, hcc);
+                        break;
+                    case Statement.Kind.CREATE_FEED_POLICY:
+                        handleCreateFeedPolicyStatement(metadataProvider, stmt);
+                        break;
+                    case Statement.Kind.QUERY:
+                        metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
+                        metadataProvider.setResultAsyncMode(resultDelivery == ResultDelivery.ASYNC
+                                || resultDelivery == ResultDelivery.ASYNC_DEFERRED);
+                        handleQuery(metadataProvider, (Query) stmt, hcc, hdc, resultDelivery, stats);
+                        break;
+                    case Statement.Kind.COMPACT:
+                        handleCompactStatement(metadataProvider, stmt, hcc);
+                        break;
+                    case Statement.Kind.EXTERNAL_DATASET_REFRESH:
+                        handleExternalDatasetRefreshStatement(metadataProvider, stmt, hcc);
+                        break;
+                    case Statement.Kind.WRITE:
+                        Pair<IAWriterFactory, FileSplit> result = handleWriteStatement(stmt);
+                        writerFactory = (result.first != null) ? result.first : writerFactory;
+                        outputFile = result.second;
+                        break;
+                    case Statement.Kind.RUN:
+                        handleRunStatement(metadataProvider, stmt, hcc);
+                        break;
+                    case Statement.Kind.FUNCTION_DECL:
+                        // No op
+                        break;
+                    case Statement.Kind.EXTENSION:
+                        ((IExtensionStatement) stmt).handle(this, metadataProvider, hcc);
+                        break;
+                    default:
+                        throw new AsterixException("Unknown function");
+                }
+            }
+        } finally {
+            Thread.currentThread().setName(threadName);
+        }
+    }
+
+    protected void handleSetStatement(Statement stmt, Map<String, String> config) {
+        SetStatement ss = (SetStatement) stmt;
+        String pname = ss.getPropName();
+        String pvalue = ss.getPropValue();
+        config.put(pname, pvalue);
+    }
+
+    protected Pair<IAWriterFactory, FileSplit> handleWriteStatement(Statement stmt)
+            throws InstantiationException, IllegalAccessException, ClassNotFoundException {
+        WriteStatement ws = (WriteStatement) stmt;
+        File f = new File(ws.getFileName());
+        FileSplit outputFile = new FileSplit(ws.getNcName().getValue(), new FileReference(f));
+        IAWriterFactory writerFactory = null;
+        if (ws.getWriterClassName() != null) {
+            writerFactory = (IAWriterFactory) Class.forName(ws.getWriterClassName()).newInstance();
+        }
+        return new Pair<>(writerFactory, outputFile);
+    }
+
+    protected Dataverse handleUseDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt)
+            throws Exception {
+        DataverseDecl dvd = (DataverseDecl) stmt;
+        String dvName = dvd.getDataverseName().getValue();
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        MetadataLockManager.INSTANCE.acquireDataverseReadLock(dvName);
+        try {
+            Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
+            if (dv == null) {
+                throw new MetadataException("Unknown dataverse " + dvName);
+            }
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            return dv;
+        } catch (Exception e) {
+            abort(e, e, mdTxnCtx);
+            throw new MetadataException(e);
+        } finally {
+            MetadataLockManager.INSTANCE.releaseDataverseReadLock(dvName);
+        }
+    }
+
+    protected void handleCreateDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt)
+            throws Exception {
+
+        CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt;
+        String dvName = stmtCreateDataverse.getDataverseName().getValue();
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+        MetadataLockManager.INSTANCE.acquireDataverseReadLock(dvName);
+        try {
+            Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
+            if (dv != null) {
+                if (stmtCreateDataverse.getIfNotExists()) {
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                    return;
+                } else {
+                    throw new AlgebricksException("A dataverse with this name " + dvName + " already exists.");
+                }
+            }
+            MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(),
+                    new Dataverse(dvName, stmtCreateDataverse.getFormat(), IMetadataEntity.PENDING_NO_OP));
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            abort(e, e, mdTxnCtx);
+            throw e;
+        } finally {
+            MetadataLockManager.INSTANCE.releaseDataverseReadLock(dvName);
+        }
+    }
+
+    protected void validateCompactionPolicy(String compactionPolicy, Map<String, String> compactionPolicyProperties,
+            MetadataTransactionContext mdTxnCtx, boolean isExternalDataset) throws AsterixException, Exception {
+        CompactionPolicy compactionPolicyEntity = MetadataManager.INSTANCE.getCompactionPolicy(mdTxnCtx,
+                MetadataConstants.METADATA_DATAVERSE_NAME, compactionPolicy);
+        if (compactionPolicyEntity == null) {
+            throw new AsterixException("Unknown compaction policy: " + compactionPolicy);
+        }
+        String compactionPolicyFactoryClassName = compactionPolicyEntity.getClassName();
+        ILSMMergePolicyFactory mergePolicyFactory = (ILSMMergePolicyFactory) Class
+                .forName(compactionPolicyFactoryClassName).newInstance();
+        if (isExternalDataset && mergePolicyFactory.getName().compareTo("correlated-prefix") == 0) {
+            throw new AsterixException("The correlated-prefix merge policy cannot be used with external dataset.");
+        }
+        if (compactionPolicyProperties == null) {
+            if (mergePolicyFactory.getName().compareTo("no-merge") != 0) {
+                throw new AsterixException("Compaction policy properties are missing.");
+            }
+        } else {
+            for (Map.Entry<String, String> entry : compactionPolicyProperties.entrySet()) {
+                if (!mergePolicyFactory.getPropertiesNames().contains(entry.getKey())) {
+                    throw new AsterixException("Invalid compaction policy property: " + entry.getKey());
+                }
+            }
+            for (String p : mergePolicyFactory.getPropertiesNames()) {
+                if (!compactionPolicyProperties.containsKey(p)) {
+                    throw new AsterixException("Missing compaction policy property: " + p);
+                }
+            }
+        }
+    }
+
+    protected void handleCreateDatasetStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+            IHyracksClientConnection hcc) throws AsterixException, Exception {
+        MutableObject<ProgressState> progress = new MutableObject<>(ProgressState.NO_PROGRESS);
+        DatasetDecl dd = (DatasetDecl) stmt;
+        String dataverseName = getActiveDataverse(dd.getDataverse());
+        String datasetName = dd.getName().getValue();
+        DatasetType dsType = dd.getDatasetType();
+        String itemTypeDataverseName = dd.getItemTypeDataverse().getValue();
+        String itemTypeName = dd.getItemTypeName().getValue();
+        String metaItemTypeDataverseName = dd.getMetaItemTypeDataverse().getValue();
+        String metaItemTypeName = dd.getMetaItemTypeName().getValue();
+        Identifier ngNameId = dd.getNodegroupName();
+        String nodegroupName = getNodeGroupName(ngNameId, dd, dataverseName);
+        String compactionPolicy = dd.getCompactionPolicy();
+        Map<String, String> compactionPolicyProperties = dd.getCompactionPolicyProperties();
+        boolean defaultCompactionPolicy = compactionPolicy == null;
+        boolean temp = dd.getDatasetDetailsDecl().isTemp();
+
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        boolean bActiveTxn = true;
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+        MetadataLockManager.INSTANCE.createDatasetBegin(dataverseName, itemTypeDataverseName,
+                itemTypeDataverseName + "." + itemTypeName, metaItemTypeDataverseName,
+                metaItemTypeDataverseName + "." + metaItemTypeName, nodegroupName, compactionPolicy,
+                dataverseName + "." + datasetName, defaultCompactionPolicy);
+        Dataset dataset = null;
+        try {
+
+            IDatasetDetails datasetDetails = null;
+            Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
+                    datasetName);
+            if (ds != null) {
+                if (dd.getIfNotExists()) {
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                    return;
+                } else {
+                    throw new AlgebricksException("A dataset with this name " + datasetName + " already exists.");
+                }
+            }
+            Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
+                    itemTypeDataverseName, itemTypeName);
+            if (dt == null) {
+                throw new AlgebricksException(": type " + itemTypeName + " could not be found.");
+            }
+            String ngName = ngNameId != null ? ngNameId.getValue()
+                    : configureNodegroupForDataset(dd, dataverseName, mdTxnCtx);
+
+            if (compactionPolicy == null) {
+                compactionPolicy = GlobalConfig.DEFAULT_COMPACTION_POLICY_NAME;
+                compactionPolicyProperties = GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES;
+            } else {
+                validateCompactionPolicy(compactionPolicy, compactionPolicyProperties, mdTxnCtx, false);
+            }
+            switch (dd.getDatasetType()) {
+                case INTERNAL:
+                    IAType itemType = dt.getDatatype();
+                    if (itemType.getTypeTag() != ATypeTag.RECORD) {
+                        throw new AlgebricksException("Dataset type has to be a record type.");
+                    }
+
+                    IAType metaItemType = null;
+                    if (metaItemTypeDataverseName != null && metaItemTypeName != null) {
+                        metaItemType = metadataProvider.findType(metaItemTypeDataverseName, metaItemTypeName);
+                    }
+                    if (metaItemType != null && metaItemType.getTypeTag() != ATypeTag.RECORD) {
+                        throw new AlgebricksException("Dataset meta type has to be a record type.");
+                    }
+                    ARecordType metaRecType = (ARecordType) metaItemType;
+
+                    List<List<String>> partitioningExprs = ((InternalDetailsDecl) dd.getDatasetDetailsDecl())
+                            .getPartitioningExprs();
+                    List<Integer> keySourceIndicators = ((InternalDetailsDecl) dd.getDatasetDetailsDecl())
+                            .getKeySourceIndicators();
+                    boolean autogenerated = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).isAutogenerated();
+                    ARecordType aRecordType = (ARecordType) itemType;
+                    List<IAType> partitioningTypes = ValidateUtil.validatePartitioningExpressions(aRecordType,
+                            metaRecType, partitioningExprs, keySourceIndicators, autogenerated);
+
+                    List<String> filterField = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getFilterField();
+                    if (filterField != null) {
+                        ValidateUtil.validateFilterField(aRecordType, filterField);
+                    }
+                    if (compactionPolicy == null && filterField != null) {
+                        // If the dataset has a filter and the user didn't specify a merge
+                        // policy, then we will pick the
+                        // correlated-prefix as the default merge policy.
+                        compactionPolicy = GlobalConfig.DEFAULT_FILTERED_DATASET_COMPACTION_POLICY_NAME;
+                        compactionPolicyProperties = GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES;
+                    }
+                    datasetDetails = new InternalDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
+                            InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs,
+                            keySourceIndicators, partitioningTypes, autogenerated, filterField, temp);
+                    break;
+                case EXTERNAL:
+                    String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter();
+                    Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl())
+                            .getProperties();
+
+                    datasetDetails = new ExternalDatasetDetails(adapter, properties, new Date(),
+                            ExternalDatasetTransactionState.COMMIT);
+                    break;
+                default:
+                    throw new AsterixException("Unknown datatype " + dd.getDatasetType());
+            }
+
+            // #. initialize DatasetIdFactory if it is not initialized.
+            if (!DatasetIdFactory.isInitialized()) {
+                DatasetIdFactory.initialize(MetadataManager.INSTANCE.getMostRecentDatasetId());
+            }
+
+            // #. add a new dataset with PendingAddOp
+            dataset = new Dataset(dataverseName, datasetName, itemTypeDataverseName, itemTypeName,
+                    metaItemTypeDataverseName, metaItemTypeName, ngName, compactionPolicy, compactionPolicyProperties,
+                    datasetDetails, dd.getHints(), dsType, DatasetIdFactory.generateDatasetId(),
+                    IMetadataEntity.PENDING_ADD_OP);
+            MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset);
+
+            if (dd.getDatasetType() == DatasetType.INTERNAL) {
+                Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
+                        dataverseName);
+                JobSpecification jobSpec = DatasetOperations.createDatasetJobSpec(dataverse, datasetName,
+                        metadataProvider);
+
+                // #. make metadataTxn commit before calling runJob.
+                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                bActiveTxn = false;
+                progress.setValue(ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA);
+
+                // #. runJob
+                JobUtils.runJob(hcc, jobSpec, true);
+
+                // #. begin new metadataTxn
+                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                bActiveTxn = true;
+                metadataProvider.setMetadataTxnContext(mdTxnCtx);
+            }
+
+            // #. add a new dataset with PendingNoOp after deleting the dataset with PendingAddOp
+            MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName);
+            dataset.setPendingOp(IMetadataEntity.PENDING_NO_OP);
+            MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            if (bActiveTxn) {
+                abort(e, e, mdTxnCtx);
+            }
+
+            if (progress.getValue() == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
+
+                // #. execute compensation operations
+                // remove the index in NC
+                // [Notice]
+                // As long as we updated(and committed) metadata, we should remove any effect of the job
+                // because an exception occurs during runJob.
+                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                bActiveTxn = true;
+                metadataProvider.setMetadataTxnContext(mdTxnCtx);
+                CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
+                try {
+                    JobSpecification jobSpec = DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider);
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                    bActiveTxn = false;
+                    JobUtils.runJob(hcc, jobSpec, true);
+                } catch (Exception e2) {
+                    e.addSuppressed(e2);
+                    if (bActiveTxn) {
+                        abort(e, e2, mdTxnCtx);
+                    }
+                }
+
+                // remove the record from the metadata.
+                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                metadataProvider.setMetadataTxnContext(mdTxnCtx);
+                try {
+                    MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
+                            datasetName);
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                } catch (Exception e2) {
+                    e.addSuppressed(e2);
+                    abort(e, e2, mdTxnCtx);
+                    throw new IllegalStateException("System is inconsistent state: pending dataset(" + dataverseName
+                            + "." + datasetName + ") couldn't be removed from the metadata", e);
+                }
+            }
+
+            throw e;
+        } finally {
+            MetadataLockManager.INSTANCE.createDatasetEnd(dataverseName, itemTypeDataverseName,
+                    itemTypeDataverseName + "." + itemTypeName, metaItemTypeDataverseName,
+                    metaItemTypeDataverseName + "." + metaItemTypeName, nodegroupName, compactionPolicy,
+                    dataverseName + "." + datasetName, defaultCompactionPolicy);
+        }
+    }
+
+    protected void validateIfResourceIsActiveInFeed(String dataverseName, String datasetName) throws AsterixException {
+        StringBuilder builder = null;
+        IActiveEntityEventsListener[] listeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
+        for (IActiveEntityEventsListener listener : listeners) {
+            if (listener instanceof FeedEventsListener
+                    && ((FeedEventsListener) listener).isConnectedToDataset(datasetName)) {
+                if (builder == null) {
+                    builder = new StringBuilder();
+                }
+                builder.append(listener.getEntityId() + "\n");
+            }
+        }
+        if (builder != null) {
+            throw new AsterixException("Dataset " + dataverseName + "." + datasetName + " is currently being "
+                    + "fed into by the following feed(s).\n" + builder.toString() + "\n" + "Operation not supported");
+        }
+    }
+
+    protected String getNodeGroupName(Identifier ngNameId, DatasetDecl dd, String dataverse) {
+        if (ngNameId != null) {
+            return ngNameId.getValue();
+        }
+        String hintValue = dd.getHints().get(DatasetNodegroupCardinalityHint.NAME);
+        if (hintValue == null) {
+            return MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME;
+        } else {
+            return dataverse + ":" + dd.getName().getValue();
+        }
+    }
+
+    protected String configureNodegroupForDataset(DatasetDecl dd, String dataverse,
+            MetadataTransactionContext mdTxnCtx)
+            throws AsterixException {
+        int nodegroupCardinality;
+        String nodegroupName;
+        String hintValue = dd.getHints().get(DatasetNodegroupCardinalityHint.NAME);
+        if (hintValue == null) {
+            nodegroupName = MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME;
+            return nodegroupName;
+        } else {
+            int numChosen = 0;
+            boolean valid = DatasetHints.validate(DatasetNodegroupCardinalityHint.NAME,
+                    dd.getHints().get(DatasetNodegroupCardinalityHint.NAME)).first;
+            if (!valid) {
+                throw new AsterixException("Incorrect use of hint:" + DatasetNodegroupCardinalityHint.NAME);
+            } else {
+                nodegroupCardinality = Integer.parseInt(dd.getHints().get(DatasetNodegroupCardinalityHint.NAME));
+            }
+            List<String> nodeNames = AsterixAppContextInfo.getInstance().getMetadataProperties().getNodeNames();
+            List<String> nodeNamesClone = new ArrayList<>(nodeNames);
+            String metadataNodeName = AsterixAppContextInfo.getInstance().getMetadataProperties().getMetadataNodeName();
+            List<String> selectedNodes = new ArrayList<>();
+            selectedNodes.add(metadataNodeName);
+            numChosen++;
+            nodeNamesClone.remove(metadataNodeName);
+
+            if (numChosen < nodegroupCardinality) {
+                Random random = new Random();
+                String[] nodes = nodeNamesClone.toArray(new String[] {});
+                int[] b = new int[nodeNamesClone.size()];
+                for (int i = 0; i < b.length; i++) {
+                    b[i] = i;
+                }
+
+                for (int i = 0; i < nodegroupCardinality - numChosen; i++) {
+                    int selected = i + random.nextInt(nodeNamesClone.size() - i);
+                    int selNodeIndex = b[selected];
+                    selectedNodes.add(nodes[selNodeIndex]);
+                    int temp = b[0];
+                    b[0] = b[selected];
+                    b[selected] = temp;
+                }
+            }
+            nodegroupName = dataverse + ":" + dd.getName().getValue();
+            MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(nodegroupName, selectedNodes));
+            return nodegroupName;
+        }
+
+    }
+
+    protected void handleCreateIndexStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+            IHyracksClientConnection hcc) throws Exception {
+        ProgressState progress = ProgressState.NO_PROGRESS;
+        CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
+        String dataverseName = getActiveDataverse(stmtCreateIndex.getDataverseName());
+        String datasetName = stmtCreateIndex.getDatasetName().getValue();
+        List<Integer> keySourceIndicators = stmtCreateIndex.getFieldSourceIndicators();
+
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        boolean bActiveTxn = true;
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+        MetadataLockManager.INSTANCE.createIndexBegin(dataverseName, dataverseName + "." + datasetName);
+
+        String indexName = null;
+        JobSpecification spec = null;
+        Dataset ds = null;
+        // For external datasets
+        ArrayList<ExternalFile> externalFilesSnapshot = null;
+        boolean firstExternalDatasetIndex = false;
+        boolean filesIndexReplicated = false;
+        Index filesIndex = null;
+        boolean datasetLocked = false;
+        try {
+            ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
+                    datasetName);
+            if (ds == null) {
+                throw new AlgebricksException(
+                        "There is no dataset with this name " + datasetName + " in dataverse " + dataverseName);
+            }
+
+            indexName = stmtCreateIndex.getIndexName().getValue();
+            Index idx = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
+                    datasetName, indexName);
+            Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
+                    ds.getItemTypeDataverseName(), ds.getItemTypeName());
+            ARecordType aRecordType = (ARecordType) dt.getDatatype();
+            ARecordType metaRecordType = null;
+            if (ds.hasMetaPart()) {
+                Datatype metaDt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
+                        ds.getMetaItemTypeDataverseName(), ds.getMetaItemTypeName());
+                metaRecordType = (ARecordType) metaDt.getDatatype();
+            }
+
+            List<List<String>> indexFields = new ArrayList<>();
+            List<IAType> indexFieldTypes = new ArrayList<>();
+            int keyIndex = 0;
+            for (Pair<List<String>, TypeExpression> fieldExpr : stmtCreateIndex.getFieldExprs()) {
+                IAType fieldType = null;
+                ARecordType subType = KeyFieldTypeUtils.chooseSource(keySourceIndicators, keyIndex, aRecordType,
+                        metaRecordType);
+                boolean isOpen = subType.isOpen();
+                int i = 0;
+                if (fieldExpr.first.size() > 1 && !isOpen) {
+                    while (i < fieldExpr.first.size() - 1 && !isOpen) {
+                        subType = (ARecordType) subType.getFieldType(fieldExpr.first.get(i));
+                        i++;
+                        isOpen = subType.isOpen();
+                    }
+                }
+                if (fieldExpr.second == null) {
+                    fieldType = subType.getSubFieldType(fieldExpr.first.subList(i, fieldExpr.first.size()));
+                } else {
+                    if (!stmtCreateIndex.isEnforced()) {
+                        throw new AlgebricksException("Cannot create typed index on \"" + fieldExpr.first
+                                + "\" field without enforcing it's type");
+                    }
+                    if (!isOpen) {
+                        throw new AlgebricksException("Typed index on \"" + fieldExpr.first
+                                + "\" field could be created only for open datatype");
+                    }
+                    if (stmtCreateIndex.hasMetaField()) {
+                        throw new AlgebricksException("Typed open index can only be created on the record part");
+                    }
+                    Map<TypeSignature, IAType> typeMap = TypeTranslator.computeTypes(mdTxnCtx, fieldExpr.second,
+                            indexName, dataverseName);
+                    TypeSignature typeSignature = new TypeSignature(dataverseName, indexName);
+                    fieldType = typeMap.get(typeSignature);
+                }
+                if (fieldType == null) {
+                    throw new AlgebricksException(
+                            "Unknown type " + (fieldExpr.second == null ? fieldExpr.first : fieldExpr.second));
+                }
+
+                indexFields.add(fieldExpr.first);
+                indexFieldTypes.add(fieldType);
+                ++keyIndex;
+            }
+
+            ValidateUtil.validateKeyFields(aRecordType, metaRecordType, indexFields, keySourceIndicators,
+                    indexFieldTypes, stmtCreateIndex.getIndexType());
+
+            if (idx != null) {
+                if (stmtCreateIndex.getIfNotExists()) {
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                    return;
+                } else {
+                    throw new AlgebricksException("An index with this name " + indexName + " already exists.");
+                }
+            }
+
+            // Checks whether a user is trying to create an inverted secondary index on a dataset
+            // with a variable-length primary key.
+            // Currently, we do not support this. Therefore, as a temporary solution, we print an
+            // error message and stop.
+            if (stmtCreateIndex.getIndexType() == IndexType.SINGLE_PARTITION_WORD_INVIX
+                    || stmtCreateIndex.getIndexType() == IndexType.SINGLE_PARTITION_NGRAM_INVIX
+                    || stmtCreateIndex.getIndexType() == IndexType.LENGTH_PARTITIONED_WORD_INVIX
+                    || stmtCreateIndex.getIndexType() == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) {
+                List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(ds);
+                for (List<String> partitioningKey : partitioningKeys) {
+                    IAType keyType = aRecordType.getSubFieldType(partitioningKey);
+                    ITypeTraits typeTrait = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+
+                    // If it is not a fixed length
+                    if (typeTrait.getFixedLength() < 0) {
+                        throw new AlgebricksException("The keyword or ngram index -" + indexName
+                                + " cannot be created on the dataset -" + datasetName
+                                + " due to its variable-length primary key field - " + partitioningKey);
+                    }
+
+                }
+            }
+
+            if (ds.getDatasetType() == DatasetType.INTERNAL) {
+                validateIfResourceIsActiveInFeed(dataverseName, datasetName);
+            } else {
+                // External dataset
+                // Check if the dataset is indexible
+                if (!ExternalIndexingOperations.isIndexible((ExternalDatasetDetails) ds.getDatasetDetails())) {
+                    throw new AlgebricksException(
+                            "dataset using " + ((ExternalDatasetDetails) ds.getDatasetDetails()).getAdapter()
+                                    + " Adapter can't be indexed");
+                }
+                // Check if the name of the index is valid
+                if (!ExternalIndexingOperations.isValidIndexName(datasetName, indexName)) {
+                    throw new AlgebricksException("external dataset index name is invalid");
+                }
+
+                // Check if the files index exist
+                filesIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
+                        datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName));
+                firstExternalDatasetIndex = filesIndex == null;
+                // Lock external dataset
+                ExternalDatasetsRegistry.INSTANCE.buildIndexBegin(ds, firstExternalDatasetIndex);
+                datasetLocked = true;
+                if (firstExternalDatasetIndex) {
+                    // Verify that no one has created an index before we acquire the lock
+                    filesIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
+                            dataverseName, datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName));
+                    if (filesIndex != null) {
+                        ExternalDatasetsRegistry.INSTANCE.buildIndexEnd(ds, firstExternalDatasetIndex);
+                        firstExternalDatasetIndex = false;
+                        ExternalDatasetsRegistry.INSTANCE.buildIndexBegin(ds, firstExternalDatasetIndex);
+                    }
+                }
+                if (firstExternalDatasetIndex) {
+                    // Get snapshot from External File System
+                    externalFilesSnapshot = ExternalIndexingOperations.getSnapshotFromExternalFileSystem(ds);
+                    // Add an entry for the files index
+                    filesIndex = new Index(dataverseName, datasetName,
+                            ExternalIndexingOperations.getFilesIndexName(datasetName), IndexType.BTREE,
+                            ExternalIndexingOperations.FILE_INDEX_FIELD_NAMES, null,
+                            ExternalIndexingOperations.FILE_INDEX_FIELD_TYPES, false, false,
+                            IMetadataEntity.PENDING_ADD_OP);
+                    MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), filesIndex);
+                    // Add files to the external files index
+                    for (ExternalFile file : externalFilesSnapshot) {
+                        MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file);
+                    }
+                    // This is the first index for the external dataset, replicate the files index
+                    spec = ExternalIndexingOperations.buildFilesIndexReplicationJobSpec(ds, externalFilesSnapshot,
+                            metadataProvider, true);
+                    if (spec == null) {
+                        throw new AsterixException(
+                                "Failed to create job spec for replicating Files Index For external dataset");
+                    }
+                    filesIndexReplicated = true;
+                    JobUtils.runJob(hcc, spec, true);
+                }
+            }
+
+            // check whether there exists another enforced index on the same field
+            if (stmtCreateIndex.isEnforced()) {
+                List<Index> indexes = MetadataManager.INSTANCE
+                        .getDatasetIndexes(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName);
+                for (Index index : indexes) {
+                    if (index.getKeyFieldNames().equals(indexFields)
+                            && !index.getKeyFieldTypes().equals(indexFieldTypes) && index.isEnforcingKeyFileds()) {
+                        throw new AsterixException("Cannot create index " + indexName + " , enforced index "
+                                + index.getIndexName() + " on field \"" + StringUtils.join(indexFields, ',')
+                                + "\" is already defined with type \"" + index.getKeyFieldTypes() + "\"");
+                    }
+                }
+            }
+
+            // #. add a new index with PendingAddOp
+            Index index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(), indexFields,
+                    keySourceIndicators, indexFieldTypes, stmtCreateIndex.getGramLength(), stmtCreateIndex.isEnforced(),
+                    false, IMetadataEntity.PENDING_ADD_OP);
+            MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
+
+            ARecordType enforcedType = null;
+            if (stmtCreateIndex.isEnforced()) {
+                enforcedType = IntroduceSecondaryIndexInsertDeleteRule.createEnforcedType(aRecordType,
+                        Lists.newArrayList(index));
+            }
+
+            // #. prepare to create the index artifact in NC.
+            CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName,
+                    index.getDatasetName(), index.getKeyFieldNames(), index.getKeyFieldTypes(),
+                    index.isEnforcingKeyFileds(), index.getGramLength(), index.getIndexType());
+            spec = IndexOperations.buildSecondaryIndexCreationJobSpec(cis, aRecordType, metaRecordType,
+                    keySourceIndicators, enforcedType, metadataProvider);
+            if (spec == null) {
+                throw new AsterixException("Failed to create job spec for creating index '"
+                        + stmtCreateIndex.getDatasetName() + "." + stmtCreateIndex.getIndexName() + "'");
+            }
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            bActiveTxn = false;
+
+            progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
+
+            // #. create the index artifact in NC.
+            JobUtils.runJob(hcc, spec, true);
+
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            bActiveTxn = true;
+            metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+            // #. load data into the index in NC.
+            cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName, index.getDatasetName(),
+                    index.getKeyFieldNames(), index.getKeyFieldTypes(), index.isEnforcingKeyFileds(),
+                    index.getGramLength(), index.getIndexType());
+
+            spec = IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, aRecordType, metaRecordType,
+                    keySourceIndicators, enforcedType, metadataProvider);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            bActiveTxn = false;
+
+            JobUtils.runJob(hcc, spec, true);
+
+            // #. begin new metadataTxn
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            bActiveTxn = true;
+            metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+            // #. add another new index with PendingNoOp after deleting the index with PendingAddOp
+            MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
+                    indexName);
+            index.setPendingOp(IMetadataEntity.PENDING_NO_OP);
+            MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
+            // add another new files index with PendingNoOp after deleting the index with
+            // PendingAddOp
+            if (firstExternalDatasetIndex) {
+                MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
+                        filesIndex.getIndexName());
+                filesIndex.setPendingOp(IMetadataEntity.PENDING_NO_OP);
+                MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), filesIndex);
+                // update transaction timestamp
+                ((ExternalDatasetDetails) ds.getDatasetDetails()).setRefreshTimestamp(new Date());
+                MetadataManager.INSTANCE.updateDataset(mdTxnCtx, ds);
+            }
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+
+        } catch (Exception e) {
+            if (bActiveTxn) {
+                abort(e, e, mdTxnCtx);
+            }
+            // If files index was replicated for external dataset, it should be cleaned up on NC side
+            if (filesIndexReplicated) {
+                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                bActiveTxn = true;
+                CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
+                        ExternalIndexingOperations.getFilesIndexName(datasetName));
+                try {
+                    JobSpecification jobSpec = ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds,
+                            metadataProvider, ds);
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                    bActiveTxn = false;
+                    JobUtils.runJob(hcc, jobSpec, true);
+                } catch (Exception e2) {
+                    e.addSuppressed(e2);
+                    if (bActiveTxn) {
+                        abort(e, e2, mdTxnCtx);
+                    }
+                }
+            }
+
+            if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
+                // #. execute compensation operations
+                // remove the index in NC
+                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                bActiveTxn = true;
+                metadataProvider.setMetadataTxnContext(mdTxnCtx);
+                CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
+                try {
+                    JobSpecification jobSpec = IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider,
+                            ds);
+
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                    bActiveTxn = false;
+                    JobUtils.runJob(hcc, jobSpec, true);
+                } catch (Exception e2) {
+                    e.addSuppressed(e2);
+                    if (bActiveTxn) {
+                        abort(e, e2, mdTxnCtx);
+                    }
+                }
+
+                if (firstExternalDatasetIndex) {
+                    mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                    metadataProvider.setMetadataTxnContext(mdTxnCtx);
+                    try {
+                        // Drop External Files from metadata
+                        MetadataManager.INSTANCE.dropDatasetExternalFiles(mdTxnCtx, ds);
+                        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                    } catch (Exception e2) {
+                        e.addSuppressed(e2);
+                        abort(e, e2, mdTxnCtx);
+                        throw new IllegalStateException("System is inconsistent state: pending files for("
+                                + dataverseName + "." + datasetName + ") couldn't be removed from the metadata", e);
+                    }
+                    mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                    metadataProvider.setMetadataTxnContext(mdTxnCtx);
+                    try {
+                        // Drop the files index from metadata
+                        MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
+                                datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName));
+                        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                    } catch (Exception e2) {
+                        e.addSuppressed(e2);
+                        abort(e, e2, mdTxnCtx);
+                        throw new IllegalStateException("System is inconsistent state: pending index(" + dataverseName
+                                + "." + datasetName + "." + ExternalIndexingOperations.getFilesIndexName(datasetName)
+                                + ") couldn't be removed from the metadata", e);
+                    }
+                }
+                // remove the record from the metadata.
+                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                metadataProvider.setMetadataTxnContext(mdTxnCtx);
+                try {
+                    MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
+                            datasetName, indexName);
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                } catch (Exception e2) {
+                    e.addSuppressed(e2);
+                    abort(e, e2, mdTxnCtx);
+                    throw new IllegalStateException("System is in inconsistent state: pending index(" + dataverseName
+                            + "." + datasetName + "." + indexName + ") couldn't be removed from the metadata", e);
+                }
+            }
+            throw e;
+        } finally {
+            MetadataLockManager.INSTANCE.createIndexEnd(dataverseName, dataverseName + "." + datasetName);
+            if (datasetLocked) {
+                ExternalDatasetsRegistry.INSTANCE.buildIndexEnd(ds, firstExternalDatasetIndex);
+            }
+        }
+    }
+
+    protected void handleCreateTypeStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
+        TypeDecl stmtCreateType = (TypeDecl) stmt;
+        String dataverseName = getActiveDataverse(stmtCreateType.getDataverseName());
+        String typeName = stmtCreateType.getIdent().getValue();
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        MetadataLockManager.INSTANCE.createTypeBegin(dataverseName, dataverseName + "." + typeName);
+        try {
+
+            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
+            if (dv == null) {
+                throw new AlgebricksException("Unknown dataverse " + dataverseName);
+            }
+            Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
+            if (dt != null) {
+                if (!stmtCreateType.getIfNotExists()) {
+                    throw new AlgebricksException("A datatype with this name " + typeName + " already exists.");
+                }
+            } else {
+                if (builtinTypeMap.get(typeName) != null) {
+                    throw new AlgebricksException("Cannot redefine builtin type " + typeName + ".");
+                } else {
+                    Map<TypeSignature, IAType> typeMap = TypeTranslator.computeTypes(mdTxnCtx,
+                            stmtCreateType.getTypeDef(), stmtCreateType.getIdent().getValue(), dataverseName);
+                    TypeSignature typeSignature = new TypeSignature(dataverseName, typeName);
+                    IAType type = typeMap.get(typeSignature);
+                    MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new Datatype(dataverseName, typeName, type, false));
+                }
+            }
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            abort(e, e, mdTxnCtx);
+            throw e;
+        } finally {
+            MetadataLockManager.INSTANCE.createTypeEnd(dataverseName, dataverseName + "." + typeName);
+        }
+    }
+
+    protected void handleDataverseDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+            IHyracksClientConnection hcc) throws Exception {
+        DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt;
+        String dataverseName = stmtDelete.getDataverseName().getValue();
+
+        ProgressState progress = ProgressState.NO_PROGRESS;
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        boolean bActiveTxn = true;
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        MetadataLockManager.INSTANCE.acquireDataverseWriteLock(dataverseName);
+        List<JobSpecification> jobsToExecute = new ArrayList<>();
+        try {
+            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
+            if (dv == null) {
+                if (stmtDelete.getIfExists()) {
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                    return;
+                } else {
+                    throw new AlgebricksException("There is no dataverse with this name " + dataverseName + ".");
+                }
+            }
+            // # disconnect all feeds from any datasets in the dataverse.
+            IActiveEntityEventsListener[] activeListeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
+            Identifier dvId = new Identifier(dataverseName);
+            for (IActiveEntityEventsListener listener : activeListeners) {
+                EntityId activeEntityId = listener.getEntityId();
+                if (activeEntityId.getExtensionName().equals(Feed.EXTENSION_NAME)
+                        && activeEntityId.getDataverse().equals(dataverseName)) {
+                    FeedEventsListener feedEventListener = (FeedEventsListener) listener;
+                    FeedConnectionId[] connections = feedEventListener.getConnections();
+                    for (FeedConnectionId conn : connections) {
+                        disconnectFeedBeforeDelete(dvId, activeEntityId, conn, metadataProvider, hcc);
+                    }
+                    // prepare job to remove feed log storage
+                    jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob(
+                            MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverseName, activeEntityId.getEntityName())));
+                }
+            }
+
+            // #. prepare jobs which will drop corresponding datasets with indexes.
+            List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dataverseName);
+            for (int j = 0; j < datasets.size(); j++) {
+                String datasetName = datasets.get(j).getDatasetName();
+                DatasetType dsType = datasets.get(j).getDatasetType();
+                if (dsType == DatasetType.INTERNAL) {
+                    List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName,
+                            datasetName);
+                    for (int k = 0; k < indexes.size(); k++) {
+                        if (indexes.get(k).isSecondaryIndex()) {
+                            CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
+                                    indexes.get(k).getIndexName());
+                            jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider,
+                                    datasets.get(j)));
+                        }
+                    }
+
+                    CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
+                    jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
+                } else {
+                    // External dataset
+                    List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName,
+                            datasetName);
+                    for (int k = 0; k < indexes.size(); k++) {
+                        if (ExternalIndexingOperations.isFileIndex(indexes.get(k))) {
+                            CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
+                                    indexes.get(k).getIndexName());
+                            jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds,
+                                    metadataProvider, datasets.get(j)));
+                        } else {
+                            CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
+                                    indexes.get(k).getIndexName());
+                            jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider,
+                                    datasets.get(j)));
+                        }
+                    }
+                    ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(datasets.get(j));
+                }
+            }
+            jobsToExecute.add(DataverseOperations.createDropDataverseJobSpec(dv, metadataProvider));
+            // #. mark PendingDropOp on the dataverse record by
+            // first, deleting the dataverse record from the DATAVERSE_DATASET
+            // second, inserting the dataverse record with the PendingDropOp value into the
+            // DATAVERSE_DATASET
+            MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
+            MetadataManager.INSTANCE.addDataverse(mdTxnCtx,
+                    new Dataverse(dataverseName, dv.getDataFormat(), IMetadataEntity.PENDING_DROP_OP));
+
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            bActiveTxn = false;
+            progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
+
+            for (JobSpecification jobSpec : jobsToExecute) {
+                JobUtils.runJob(hcc, jobSpec, true);
+            }
+
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            bActiveTxn = true;
+            metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+            // #. finally, delete the dataverse.
+            MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
+            if (activeDefaultDataverse != null && activeDefaultDataverse.getDataverseName() == dataverseName) {
+                activeDefaultDataverse = null;
+            }
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            if (bActiveTxn) {
+                abort(e, e, mdTxnCtx);
+            }
+
+            if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
+                if (activeDefaultDataverse != null && activeDefaultDataverse.getDataverseName() == dataverseName) {
+                    activeDefaultDataverse = null;
+                }
+
+                // #. execute compensation operations
+                // remove the all indexes in NC
+                try {
+                    for (JobSpecification jobSpec : jobsToExecute) {
+                        JobUtils.runJob(hcc, jobSpec, true);
+                    }
+                } catch (Exception e2) {
+                    // do no throw exception since still the metadata needs to be compensated.
+                    e.addSuppressed(e2);
+                }
+
+                // remove the record from the metadata.
+                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                try {
+                    MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                } catch (Exception e2) {
+                    e.addSuppressed(e2);
+                    abort(e, e2, mdTxnCtx);
+                    throw new IllegalStateException("System is inconsistent state: pending dataverse(" + dataverseName
+                            + ") couldn't be removed from the metadata", e);
+                }
+            }
+
+            throw e;
+        } finally {
+            MetadataLockManager.INSTANCE.releaseDataverseWriteLock(dataverseName);
+        }
+    }
+
+    protected void disconnectFeedBeforeDelete(Identifier dvId, EntityId activeEntityId, FeedConnectionId conn,
+            AqlMetadataProvider metadataProvider, IHyracksClientConnection hcc) {
+        DisconnectFeedStatement disStmt = new DisconnectFeedStatement(dvId,
+                new Identifier(activeEntityId.getEntityName()), new Identifier(conn.getDatasetName()));
+        try {
+            handleDisconnectFeedStatement(metadataProvider, disStmt, hcc);
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Disconnected feed " + activeEntityId.getEntityName() + " from dataset "
+                        + conn.getDatasetName());
+            }
+        } catch (Exception exception) {
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Unable to disconnect feed " + activeEntityId.getEntityName() + " from dataset "
+                        + conn.getDatasetName() + ". Encountered exception " + exception);
+            }
+        }
+    }
+
+    protected Dataset getDataset(MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName)
+            throws MetadataException {
+        return MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+    }
+
+    protected void handleDatasetDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+            IHyracksClientConnection hcc) throws Exception {
+        DropDatasetStatement stmtDelete = (DropDatasetStatement) stmt;
+        String dataverseName = getActiveDataverse(stmtDelete.getDataverseName());
+        String datasetName = stmtDelete.getDatasetName().getValue();
+        MutableObject<ProgressState> progress = new MutableObject<>(ProgressState.NO_PROGRESS);
+        MutableObject<MetadataTransactionContext> mdTxnCtx = new MutableObject<>(
+                MetadataManager.INSTANCE.beginTransaction());
+        MutableBoolean bActiveTxn = new MutableBoolean(true);
+        metadataProvider.setMetadataTxnContext(mdTxnCtx.getValue());
+        MetadataLockManager.INSTANCE.dropDatasetBegin(dataverseName, dataverseName + "." + datasetName);
+        List<JobSpecification> jobsToExecute = new ArrayList<>();
+        try {
+            Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx.getValue(), dataverseName, datasetName);
+            if (ds == null) {
+                if (stmtDelete.getIfExists()) {
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
+                    return;
+                } else {
+                    throw new AlgebricksException("There is no dataset with this name " + datasetName
+                            + " in dataverse " + dataverseName + ".");
+                }
+            }
+
+            doDropDataset(ds, datasetName, metadataProvider, mdTxnCtx, jobsToExecute, dataverseName, bActiveTxn,
+                    progress, hcc);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
+        } catch (Exception e) {
+            if (bActiveTxn.booleanValue()) {
+                abort(e, e, mdTxnCtx.getValue());
+            }
+
+            if (progress.getValue() == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
+                // #. execute compensation operations
+                // remove the all indexes in NC
+                try {
+                    for (JobSpecification jobSpec : jobsToExecute) {
+                        JobUtils.runJob(hcc, jobSpec, true);
+                    }
+                } catch (Exception e2) {
+                    // do no throw exception since still the metadata needs to be compensated.
+                    e.addSuppressed(e2);
+                }
+
+                // remove the record from the metadata.
+                mdTxnCtx.setValue(MetadataManager.INSTANCE.beginTransaction());
+                metadataProvider.setMetadataTxnContext(mdTxnCtx.getValue());
+                try {
+                    MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
+                            datasetName);
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
+                } catch (Exception e2) {
+                    e.addSuppressed(e2);
+                    abort(e, e2, mdTxnCtx.getValue());
+                    throw new IllegalStateException("System is inconsistent state: pending dataset(" + dataverseName
+                            + "." + datasetName + ") couldn't be removed from the metadata", e);
+                }
+            }
+
+            throw e;
+        } finally {
+            MetadataLockManager.INSTANCE.dropDatasetEnd(dataverseName, dataverseName + "." + datasetName);
+        }
+    }
+
+    protected void doDropDataset(Dataset ds, String datasetName, AqlMetadataProvider metadataProvider,
+            MutableObject<MetadataTransactionContext> mdTxnCtx, List<JobSpecification> jobsToExecute,
+            String dataverseName, MutableBoolean bActiveTxn, MutableObject<ProgressState> progress,
+            IHyracksClientConnection hcc) throws Exception {
+        Map<FeedConnectionId, Pair<JobSpecification, Boolean>> disconnectJobList = new HashMap<>();
+        if (ds.getDatasetType() == DatasetType.INTERNAL) {
+            // prepare job spec(s) that would disconnect any active feeds involving the dataset.
+            IActiveEntityEventsListener[] feedConnections = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
+            for (IActiveEntityEventsListener conn : feedConnections) {
+                if (conn.getEntityId().getExtensionName().equals(Feed.EXTENSION_NAME)
+                        && ((FeedEventsListener) conn).isConnectedToDataset(datasetName)) {
+                    FeedConnectionId connectionId = new FeedConnectionId(conn.getEntityId(), datasetName);
+                    Pair<JobSpecification, Boolean> p = FeedOperations.buildDisconnectFeedJobSpec(metadataProvider,
+                            connectionId);
+                    disconnectJobList.put(connectionId, p);
+                    if (LOGGER.isLoggable(Level.INFO)) {
+                        LOGGER.info("Disconnecting feed " + connectionId.getFeedId().getEntityName() + " from dataset "
+                                + datasetName + " as dataset is being dropped");
+                    }
+                    // prepare job to remove feed log storage
+                    jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob(MetadataManager.INSTANCE.getFeed(
+                            mdTxnCtx.getValue(), connectionId.getFeedId().getDataverse(),
+                            connectionId.getFeedId().getEntityName())));
+                }
+            }
+
+            // #. prepare jobs to drop the datatset and the indexes in NC
+            List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx.getValue(), dataverseName,
+                    datasetName);
+            for (int j = 0; j < indexes.size(); j++) {
+                if (indexes.get(j).isSecondaryIndex()) {
+                    CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
+                            indexes.get(j).getIndexName());
+                    jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds));
+                }
+            }
+            CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
+            jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
+
+            // #. mark the existing dataset as PendingDropOp
+            MetadataManager.INSTANCE.dropDataset(mdTxnCtx.getValue(), dataverseName, datasetName);
+            MetadataManager.INSTANCE.addDataset(mdTxnCtx.getValue(),
+                    new Dataset(dataverseName, datasetName, ds.getItemTypeDataverseName(), ds.getItemTypeName(),
+                            ds.getMetaItemTypeDataverseName(), ds.getMetaItemTypeName(), ds.getNodeGroupName(),
+                            ds.getCompactionPolicy(), ds.getCompactionPolicyProperties(), ds.getDatasetDetails(),
+                            ds.getHints(), ds.getDatasetType(), ds.getDatasetId(), IMetadataEntity.PENDING_DROP_OP));
+
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
+            bActiveTxn.setValue(false);
+            progress.setValue(ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA);
+
+            // # disconnect the feeds
+            for (Pair<JobSpecification, Boolean> p : disconnectJobList.values()) {
+                JobUtils.runJob(hcc, p.first, true);
+            }
+
+            // #. run the jobs
+            for (JobSpecification jobSpec : jobsToExecute) {
+                JobUtils.runJob(hcc, jobSpec, true);
+            }
+
+            mdTxnCtx.setValue(MetadataManager.INSTANCE.beginTransaction());
+            bActiveTxn.setValue(true);
+            metadataProvider.setMetadataTxnContext(mdTxnCtx.getValue());
+        } else {
+            // External dataset
+            ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(ds);
+            // #. prepare jobs to drop the datatset and the indexes in NC
+            List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx.getValue(), dataverseName,
+                    datasetName);
+            for (int j = 0; j < indexes.size(); j++) {
+                if (ExternalIndexingOperations.isFileIndex(indexes.get(j))) {
+                    CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
+                            indexes.get(j).getIndexName());
+                    jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds));
+                } else {
+                    CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
+                            indexes.get(j).getIndexName());
+                    jobsToExecute.add(
+                            ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds, metadataProvider, ds));
+                }
+            }
+
+            // #. mark the existing dataset as PendingDropOp
+            MetadataManager.INSTANCE.dropDataset(mdTxnCtx.getValue(), dataverseName, datasetName);
+            MetadataManager.INSTANCE.addDataset(mdTxnCtx.getValue(),
+                    new Dataset(dataverseName, datasetName, ds.getItemTypeDataverseName(), ds.getItemTypeName(),
+                            ds.getNodeGroupName(), ds.getCompactionPolicy(), ds.getCompactionPolicyProperties(),
+                            ds.getDatasetDetails(), ds.getHints(), ds.getDatasetType(), ds.getDatasetId(),
+                            IMetadataEntity.PENDING_DROP_OP));
+
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
+            bActiveTxn.setValue(false);
+            progress.setValue(ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA);
+
+            // #. run the jobs
+            for (JobSpecification jobSpec : jobsToExecute) {
+                JobUtils.runJob(hcc, jobSpec, true);
+            }
+            if (!indexes.isEmpty()) {
+                ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(ds);
+            }
+            mdTxnCtx.setValue(MetadataManager.INSTANCE.beginTransaction());
+            bActiveTxn.setValue(true);
+            metadataProvider.setMetadataTxnContext(mdTxnCtx.getValue());
+        }
+
+        // #. finally, delete the dataset.
+        MetadataManager.INSTANCE.dropDataset(mdTxnCtx.getValue(), dataverseName, datasetName);
+        // Drop the associated nodegroup
+        String nodegroup = ds.getNodeGroupName();
+        if (!nodegroup.equalsIgnoreCase(MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME)) {
+            MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx.getValue(), dataverseName + ":" + datasetName);
+        }
+    }
+
+    protected void handleIndexDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+            IHyracksClientConnection hcc) throws Exception {
+
+        IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
+        String datasetName = stmtIndexDrop.getDatasetName().getValue();
+        String dataverseName = getActiveDataverse(stmtIndexDrop.getDataverseName());
+        ProgressState progress = ProgressState.NO_PROGRESS;
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        boolean bActiveTxn = true;
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+        MetadataLockManager.INSTANCE.dropIndexBegin(dataverseName, dataverseName + "." + datasetName);
+
+        String indexName = null;
+        // For external index
+        boolean dropFilesIndex = false;
+        List<JobSpecification> jobsToExecute = new ArrayList<>();
+        try {
+
+            Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+            if (ds == null) {
+                throw new AlgebricksException(
+                        "There is no dataset with this name " + datasetName + " in dataverse " + dataverseName);
+            }
+            IActiveEntityEventsListener[] listeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
+            StringBuilder builder = null;
+            for (IActiveEntityEventsListener listener : listeners) {
+                if (listener.getEntityId().getExtensionName().equals(Feed.EXTENSION_NAME)
+                        && ((FeedEventsListener) listener).isConnectedToDataset(datasetName)) {
+                    if (builder == null) {
+                        builder = new StringBuilder();
+                    }
+                    builder.append(new FeedConnectionId(listener.getEntityId(), datasetName) + "\n");
+                }
+            }
+            if (builder != null) {
+                throw new AsterixException(
+                        "Dataset" + datasetName + " is currently being fed into by the following feeds " + "."
+                                + builder.toString() + "\nOperation not supported.");
+            }
+
+            if (ds.getDatasetType() == DatasetType.INTERNAL) {
+                indexName = stmtIndexDrop.getIndexName().getValue();
+                Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
+                if (index == null) {
+                    if (stmtIndexDrop.getIfExists()) {
+                        Metadat

<TRUNCATED>

Mime
View raw message