asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ima...@apache.org
Subject [10/51] [partial] incubator-asterixdb git commit: Change folder structure for Java repackage
Date Tue, 25 Aug 2015 16:43:58 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/org/apache/asterix/aql/translator/AqlTranslator.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/org/apache/asterix/aql/translator/AqlTranslator.java
new file mode 100644
index 0000000..93917cc
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/aql/translator/AqlTranslator.java
@@ -0,0 +1,3069 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.aql.translator;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.rmi.RemoteException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.commons.lang3.StringUtils;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.asterix.api.common.APIFramework;
+import edu.uci.ics.asterix.api.common.Job;
+import edu.uci.ics.asterix.api.common.SessionConfig;
+import edu.uci.ics.asterix.api.common.SessionConfig.OutputFormat;
+import edu.uci.ics.asterix.aql.base.Statement;
+import edu.uci.ics.asterix.aql.expression.CompactStatement;
+import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
+import edu.uci.ics.asterix.aql.expression.CreateDataverseStatement;
+import edu.uci.ics.asterix.aql.expression.CreateFeedPolicyStatement;
+import edu.uci.ics.asterix.aql.expression.CreateFeedStatement;
+import edu.uci.ics.asterix.aql.expression.CreateFunctionStatement;
+import edu.uci.ics.asterix.aql.expression.CreateIndexStatement;
+import edu.uci.ics.asterix.aql.expression.CreatePrimaryFeedStatement;
+import edu.uci.ics.asterix.aql.expression.CreateSecondaryFeedStatement;
+import edu.uci.ics.asterix.aql.expression.DatasetDecl;
+import edu.uci.ics.asterix.aql.expression.DataverseDecl;
+import edu.uci.ics.asterix.aql.expression.DataverseDropStatement;
+import edu.uci.ics.asterix.aql.expression.DeleteStatement;
+import edu.uci.ics.asterix.aql.expression.DisconnectFeedStatement;
+import edu.uci.ics.asterix.aql.expression.DropStatement;
+import edu.uci.ics.asterix.aql.expression.ExternalDetailsDecl;
+import edu.uci.ics.asterix.aql.expression.FeedDropStatement;
+import edu.uci.ics.asterix.aql.expression.FeedPolicyDropStatement;
+import edu.uci.ics.asterix.aql.expression.FunctionDecl;
+import edu.uci.ics.asterix.aql.expression.FunctionDropStatement;
+import edu.uci.ics.asterix.aql.expression.IDatasetDetailsDecl;
+import edu.uci.ics.asterix.aql.expression.Identifier;
+import edu.uci.ics.asterix.aql.expression.IndexDropStatement;
+import edu.uci.ics.asterix.aql.expression.InsertStatement;
+import edu.uci.ics.asterix.aql.expression.InternalDetailsDecl;
+import edu.uci.ics.asterix.aql.expression.LoadStatement;
+import edu.uci.ics.asterix.aql.expression.NodeGroupDropStatement;
+import edu.uci.ics.asterix.aql.expression.NodegroupDecl;
+import edu.uci.ics.asterix.aql.expression.Query;
+import edu.uci.ics.asterix.aql.expression.RefreshExternalDatasetStatement;
+import edu.uci.ics.asterix.aql.expression.RunStatement;
+import edu.uci.ics.asterix.aql.expression.SetStatement;
+import edu.uci.ics.asterix.aql.expression.SubscribeFeedStatement;
+import edu.uci.ics.asterix.aql.expression.TypeDecl;
+import edu.uci.ics.asterix.aql.expression.TypeDropStatement;
+import edu.uci.ics.asterix.aql.expression.TypeExpression;
+import edu.uci.ics.asterix.aql.expression.WriteStatement;
+import edu.uci.ics.asterix.aql.util.FunctionUtils;
+import edu.uci.ics.asterix.common.config.AsterixCompilerProperties;
+import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
+import edu.uci.ics.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState;
+import edu.uci.ics.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
+import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
+import edu.uci.ics.asterix.common.config.GlobalConfig;
+import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.FeedActivity.FeedActivityDetails;
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedConnectionRequest;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.asterix.common.feeds.FeedJointKey;
+import edu.uci.ics.asterix.common.feeds.FeedPolicyAccessor;
+import edu.uci.ics.asterix.common.feeds.api.IFeedJoint;
+import edu.uci.ics.asterix.common.feeds.api.IFeedJoint.FeedJointType;
+import edu.uci.ics.asterix.common.feeds.api.IFeedLifecycleEventSubscriber;
+import edu.uci.ics.asterix.common.feeds.api.IFeedLifecycleEventSubscriber.FeedLifecycleEvent;
+import edu.uci.ics.asterix.common.feeds.api.IFeedLifecycleListener.ConnectionLocation;
+import edu.uci.ics.asterix.common.functions.FunctionSignature;
+import edu.uci.ics.asterix.feeds.CentralFeedManager;
+import edu.uci.ics.asterix.feeds.FeedJoint;
+import edu.uci.ics.asterix.feeds.FeedLifecycleListener;
+import edu.uci.ics.asterix.file.DatasetOperations;
+import edu.uci.ics.asterix.file.DataverseOperations;
+import edu.uci.ics.asterix.file.ExternalIndexingOperations;
+import edu.uci.ics.asterix.file.FeedOperations;
+import edu.uci.ics.asterix.file.IndexOperations;
+import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
+import edu.uci.ics.asterix.metadata.IDatasetDetails;
+import edu.uci.ics.asterix.metadata.MetadataException;
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
+import edu.uci.ics.asterix.metadata.bootstrap.MetadataConstants;
+import edu.uci.ics.asterix.metadata.dataset.hints.DatasetHints;
+import edu.uci.ics.asterix.metadata.dataset.hints.DatasetHints.DatasetNodegroupCardinalityHint;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.entities.CompactionPolicy;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.Datatype;
+import edu.uci.ics.asterix.metadata.entities.Dataverse;
+import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
+import edu.uci.ics.asterix.metadata.entities.ExternalFile;
+import edu.uci.ics.asterix.metadata.entities.Feed;
+import edu.uci.ics.asterix.metadata.entities.Feed.FeedType;
+import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
+import edu.uci.ics.asterix.metadata.entities.Function;
+import edu.uci.ics.asterix.metadata.entities.Index;
+import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
+import edu.uci.ics.asterix.metadata.entities.NodeGroup;
+import edu.uci.ics.asterix.metadata.entities.PrimaryFeed;
+import edu.uci.ics.asterix.metadata.entities.SecondaryFeed;
+import edu.uci.ics.asterix.metadata.feeds.FeedLifecycleEventSubscriber;
+import edu.uci.ics.asterix.metadata.feeds.FeedUtil;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapterFactory;
+import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
+import edu.uci.ics.asterix.metadata.utils.ExternalDatasetsRegistry;
+import edu.uci.ics.asterix.metadata.utils.MetadataLockManager;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.types.TypeSignature;
+import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
+import edu.uci.ics.asterix.optimizer.rules.IntroduceSecondaryIndexInsertDeleteRule;
+import edu.uci.ics.asterix.result.ResultReader;
+import edu.uci.ics.asterix.result.ResultUtils;
+import edu.uci.ics.asterix.runtime.job.listener.JobEventListenerFactory;
+import edu.uci.ics.asterix.runtime.operators.std.FlushDatasetOperatorDescriptor;
+import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetIdFactory;
+import edu.uci.ics.asterix.transaction.management.service.transaction.JobIdFactory;
+import edu.uci.ics.asterix.translator.AbstractAqlTranslator;
+import edu.uci.ics.asterix.translator.CompiledStatements.CompiledConnectFeedStatement;
+import edu.uci.ics.asterix.translator.CompiledStatements.CompiledCreateIndexStatement;
+import edu.uci.ics.asterix.translator.CompiledStatements.CompiledDatasetDropStatement;
+import edu.uci.ics.asterix.translator.CompiledStatements.CompiledDeleteStatement;
+import edu.uci.ics.asterix.translator.CompiledStatements.CompiledIndexCompactStatement;
+import edu.uci.ics.asterix.translator.CompiledStatements.CompiledIndexDropStatement;
+import edu.uci.ics.asterix.translator.CompiledStatements.CompiledInsertStatement;
+import edu.uci.ics.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
+import edu.uci.ics.asterix.translator.CompiledStatements.CompiledSubscribeFeedStatement;
+import edu.uci.ics.asterix.translator.CompiledStatements.ICompiledDmlStatement;
+import edu.uci.ics.asterix.translator.TypeTranslator;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind;
+import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
+import edu.uci.ics.hyracks.algebricks.data.IResultSerializerFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.serializer.ResultSerializerFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+
+/*
+ * Provides functionality for executing a batch of AQL statements (queries included)
+ * sequentially.
+ */
+public class AqlTranslator extends AbstractAqlTranslator {
+
+    private static Logger LOGGER = Logger.getLogger(AqlTranslator.class.getName());
+
+    private enum ProgressState {
+        NO_PROGRESS,
+        ADDED_PENDINGOP_RECORD_TO_METADATA
+    }
+
+    public static enum ResultDelivery {
+        SYNC,
+        ASYNC,
+        ASYNC_DEFERRED
+    }
+
+    public static final boolean IS_DEBUG_MODE = false;//true
+    private final List<Statement> aqlStatements;
+    private final SessionConfig sessionConfig;
+    private Dataverse activeDefaultDataverse;
+    private final List<FunctionDecl> declaredFunctions;
+
+    public AqlTranslator(List<Statement> aqlStatements, SessionConfig conf) throws MetadataException, AsterixException {
+        this.aqlStatements = aqlStatements;
+        this.sessionConfig = conf;
+        declaredFunctions = getDeclaredFunctions(aqlStatements);
+    }
+
+    private List<FunctionDecl> getDeclaredFunctions(List<Statement> statements) {
+        List<FunctionDecl> functionDecls = new ArrayList<FunctionDecl>();
+        for (Statement st : statements) {
+            if (st.getKind().equals(Statement.Kind.FUNCTION_DECL)) {
+                functionDecls.add((FunctionDecl) st);
+            }
+        }
+        return functionDecls;
+    }
+
+    /**
+     * Compiles and submits for execution a list of AQL statements.
+     * 
+     * @param hcc
+     *            A Hyracks client connection that is used to submit a jobspec to Hyracks.
+     * @param hdc
+     *            A Hyracks dataset client object that is used to read the results.
+     * @param resultDelivery
+     *            True if the results should be read asynchronously or false if we should wait for results to be read.
+     * @return A List<QueryResult> containing a QueryResult instance corresponding to each submitted query.
+     * @throws Exception
+     */
+    public void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery)
+            throws Exception {
+        int resultSetIdCounter = 0;
+        FileSplit outputFile = null;
+        IAWriterFactory writerFactory = PrinterBasedWriterFactory.INSTANCE;
+        IResultSerializerFactoryProvider resultSerializerFactoryProvider = ResultSerializerFactoryProvider.INSTANCE;
+        Map<String, String> config = new HashMap<String, String>();
+
+        for (Statement stmt : aqlStatements) {
+            validateOperation(activeDefaultDataverse, stmt);
+            AqlMetadataProvider metadataProvider = new AqlMetadataProvider(activeDefaultDataverse,
+                    CentralFeedManager.getInstance());
+            metadataProvider.setWriterFactory(writerFactory);
+            metadataProvider.setResultSerializerFactoryProvider(resultSerializerFactoryProvider);
+            metadataProvider.setOutputFile(outputFile);
+            metadataProvider.setConfig(config);
+            switch (stmt.getKind()) {
+                case SET: {
+                    handleSetStatement(metadataProvider, stmt, config);
+                    break;
+                }
+                case DATAVERSE_DECL: {
+                    activeDefaultDataverse = handleUseDataverseStatement(metadataProvider, stmt);
+                    break;
+                }
+                case CREATE_DATAVERSE: {
+                    handleCreateDataverseStatement(metadataProvider, stmt);
+                    break;
+                }
+                case DATASET_DECL: {
+                    handleCreateDatasetStatement(metadataProvider, stmt, hcc);
+                    break;
+                }
+                case CREATE_INDEX: {
+                    handleCreateIndexStatement(metadataProvider, stmt, hcc);
+                    break;
+                }
+                case TYPE_DECL: {
+                    handleCreateTypeStatement(metadataProvider, stmt);
+                    break;
+                }
+                case NODEGROUP_DECL: {
+                    handleCreateNodeGroupStatement(metadataProvider, stmt);
+                    break;
+                }
+                case DATAVERSE_DROP: {
+                    handleDataverseDropStatement(metadataProvider, stmt, hcc);
+                    break;
+                }
+                case DATASET_DROP: {
+                    handleDatasetDropStatement(metadataProvider, stmt, hcc);
+                    break;
+                }
+                case INDEX_DROP: {
+                    handleIndexDropStatement(metadataProvider, stmt, hcc);
+                    break;
+                }
+                case TYPE_DROP: {
+                    handleTypeDropStatement(metadataProvider, stmt);
+                    break;
+                }
+                case NODEGROUP_DROP: {
+                    handleNodegroupDropStatement(metadataProvider, stmt);
+                    break;
+                }
+
+                case CREATE_FUNCTION: {
+                    handleCreateFunctionStatement(metadataProvider, stmt);
+                    break;
+                }
+
+                case FUNCTION_DROP: {
+                    handleFunctionDropStatement(metadataProvider, stmt);
+                    break;
+                }
+
+                case LOAD: {
+                    handleLoadStatement(metadataProvider, stmt, hcc);
+                    break;
+                }
+                case INSERT: {
+                    handleInsertStatement(metadataProvider, stmt, hcc);
+                    break;
+                }
+                case DELETE: {
+                    handleDeleteStatement(metadataProvider, stmt, hcc);
+                    break;
+                }
+
+                case CREATE_PRIMARY_FEED:
+                case CREATE_SECONDARY_FEED: {
+                    handleCreateFeedStatement(metadataProvider, stmt, hcc);
+                    break;
+                }
+
+                case DROP_FEED: {
+                    handleDropFeedStatement(metadataProvider, stmt, hcc);
+                    break;
+                }
+
+                case DROP_FEED_POLICY: {
+                    handleDropFeedPolicyStatement(metadataProvider, stmt, hcc);
+                    break;
+                }
+
+                case CONNECT_FEED: {
+                    handleConnectFeedStatement(metadataProvider, stmt, hcc);
+                    break;
+                }
+
+                case DISCONNECT_FEED: {
+                    handleDisconnectFeedStatement(metadataProvider, stmt, hcc);
+                    break;
+                }
+
+                case SUBSCRIBE_FEED: {
+                    handleSubscribeFeedStatement(metadataProvider, stmt, hcc);
+                    break;
+                }
+
+                case CREATE_FEED_POLICY: {
+                    handleCreateFeedPolicyStatement(metadataProvider, stmt, hcc);
+                    break;
+                }
+
+                case QUERY: {
+                    metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
+                    metadataProvider.setResultAsyncMode(resultDelivery == ResultDelivery.ASYNC
+                            || resultDelivery == ResultDelivery.ASYNC_DEFERRED);
+                    handleQuery(metadataProvider, (Query) stmt, hcc, hdc, resultDelivery);
+                    break;
+                }
+
+                case COMPACT: {
+                    handleCompactStatement(metadataProvider, stmt, hcc);
+                    break;
+                }
+
+                case EXTERNAL_DATASET_REFRESH: {
+                    handleExternalDatasetRefreshStatement(metadataProvider, stmt, hcc);
+                    break;
+                }
+
+                case WRITE: {
+                    Pair<IAWriterFactory, FileSplit> result = handleWriteStatement(metadataProvider, stmt);
+                    if (result.first != null) {
+                        writerFactory = result.first;
+                    }
+                    outputFile = result.second;
+                    break;
+                }
+
+                case RUN: {
+                    handleRunStatement(metadataProvider, stmt, hcc);
+                    break;
+                }
+            }
+        }
+    }
+
+    private void handleSetStatement(AqlMetadataProvider metadataProvider, Statement stmt, Map<String, String> config)
+            throws RemoteException, ACIDException {
+        SetStatement ss = (SetStatement) stmt;
+        String pname = ss.getPropName();
+        String pvalue = ss.getPropValue();
+        config.put(pname, pvalue);
+    }
+
+    private Pair<IAWriterFactory, FileSplit> handleWriteStatement(AqlMetadataProvider metadataProvider, Statement stmt)
+            throws InstantiationException, IllegalAccessException, ClassNotFoundException {
+        WriteStatement ws = (WriteStatement) stmt;
+        File f = new File(ws.getFileName());
+        FileSplit outputFile = new FileSplit(ws.getNcName().getValue(), new FileReference(f));
+        IAWriterFactory writerFactory = null;
+        if (ws.getWriterClassName() != null) {
+            writerFactory = (IAWriterFactory) Class.forName(ws.getWriterClassName()).newInstance();
+        }
+        return new Pair<IAWriterFactory, FileSplit>(writerFactory, outputFile);
+    }
+
+    private Dataverse handleUseDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt)
+            throws Exception {
+        DataverseDecl dvd = (DataverseDecl) stmt;
+        String dvName = dvd.getDataverseName().getValue();
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        MetadataLockManager.INSTANCE.acquireDataverseReadLock(dvName);
+        try {
+            Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
+            if (dv == null) {
+                throw new MetadataException("Unknown dataverse " + dvName);
+            }
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            return dv;
+        } catch (Exception e) {
+            abort(e, e, mdTxnCtx);
+            throw new MetadataException(e);
+        } finally {
+            MetadataLockManager.INSTANCE.releaseDataverseReadLock(dvName);
+        }
+    }
+
+    private void handleCreateDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
+
+        CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt;
+        String dvName = stmtCreateDataverse.getDataverseName().getValue();
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+        MetadataLockManager.INSTANCE.acquireDataverseReadLock(dvName);
+        try {
+            Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
+            if (dv != null) {
+                if (stmtCreateDataverse.getIfNotExists()) {
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                    return;
+                } else {
+                    throw new AlgebricksException("A dataverse with this name " + dvName + " already exists.");
+                }
+            }
+            MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(), new Dataverse(dvName,
+                    stmtCreateDataverse.getFormat(), IMetadataEntity.PENDING_NO_OP));
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            abort(e, e, mdTxnCtx);
+            throw e;
+        } finally {
+            MetadataLockManager.INSTANCE.releaseDataverseReadLock(dvName);
+        }
+    }
+
+    private void validateCompactionPolicy(String compactionPolicy, Map<String, String> compactionPolicyProperties,
+            MetadataTransactionContext mdTxnCtx, boolean isExternalDataset) throws AsterixException, Exception {
+        CompactionPolicy compactionPolicyEntity = MetadataManager.INSTANCE.getCompactionPolicy(mdTxnCtx,
+                MetadataConstants.METADATA_DATAVERSE_NAME, compactionPolicy);
+        if (compactionPolicyEntity == null) {
+            throw new AsterixException("Unknown compaction policy: " + compactionPolicy);
+        }
+        String compactionPolicyFactoryClassName = compactionPolicyEntity.getClassName();
+        ILSMMergePolicyFactory mergePolicyFactory = (ILSMMergePolicyFactory) Class.forName(
+                compactionPolicyFactoryClassName).newInstance();
+        if (isExternalDataset && mergePolicyFactory.getName().compareTo("correlated-prefix") == 0) {
+            throw new AsterixException("The correlated-prefix merge policy cannot be used with external dataset.");
+        }
+        if (compactionPolicyProperties == null) {
+            if (mergePolicyFactory.getName().compareTo("no-merge") != 0) {
+                throw new AsterixException("Compaction policy properties are missing.");
+            }
+        } else {
+            for (Map.Entry<String, String> entry : compactionPolicyProperties.entrySet()) {
+                if (!mergePolicyFactory.getPropertiesNames().contains(entry.getKey())) {
+                    throw new AsterixException("Invalid compaction policy property: " + entry.getKey());
+                }
+            }
+            for (String p : mergePolicyFactory.getPropertiesNames()) {
+                if (!compactionPolicyProperties.containsKey(p)) {
+                    throw new AsterixException("Missing compaction policy property: " + p);
+                }
+            }
+        }
+    }
+
+    private void handleCreateDatasetStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+            IHyracksClientConnection hcc) throws AsterixException, Exception {
+
+        ProgressState progress = ProgressState.NO_PROGRESS;
+        DatasetDecl dd = (DatasetDecl) stmt;
+        String dataverseName = getActiveDataverse(dd.getDataverse());
+        String datasetName = dd.getName().getValue();
+        DatasetType dsType = dd.getDatasetType();
+        String itemTypeName = dd.getItemTypeName().getValue();
+        Identifier ngNameId = dd.getNodegroupName();
+        String nodegroupName = getNodeGroupName(ngNameId, dd, dataverseName);
+        String compactionPolicy = dd.getCompactionPolicy();
+        Map<String, String> compactionPolicyProperties = dd.getCompactionPolicyProperties();
+        boolean defaultCompactionPolicy = (compactionPolicy == null);
+        boolean temp = dd.getDatasetDetailsDecl().isTemp();
+
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        boolean bActiveTxn = true;
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+        MetadataLockManager.INSTANCE.createDatasetBegin(dataverseName, dataverseName + "." + itemTypeName,
+                nodegroupName, compactionPolicy, dataverseName + "." + datasetName, defaultCompactionPolicy);
+        Dataset dataset = null;
+        try {
+
+            IDatasetDetails datasetDetails = null;
+            Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
+                    datasetName);
+            if (ds != null) {
+                if (dd.getIfNotExists()) {
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                    return;
+                } else {
+                    throw new AlgebricksException("A dataset with this name " + datasetName + " already exists.");
+                }
+            }
+            Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), dataverseName,
+                    itemTypeName);
+            if (dt == null) {
+                throw new AlgebricksException(": type " + itemTypeName + " could not be found.");
+            }
+            String ngName = ngNameId != null ? ngNameId.getValue() : configureNodegroupForDataset(dd, dataverseName,
+                    mdTxnCtx);
+
+            if (compactionPolicy == null) {
+                compactionPolicy = GlobalConfig.DEFAULT_COMPACTION_POLICY_NAME;
+                compactionPolicyProperties = GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES;
+            } else {
+                validateCompactionPolicy(compactionPolicy, compactionPolicyProperties, mdTxnCtx, false);
+            }
+            switch (dd.getDatasetType()) {
+                case INTERNAL: {
+                    IAType itemType = dt.getDatatype();
+                    if (itemType.getTypeTag() != ATypeTag.RECORD) {
+                        throw new AlgebricksException("Can only partition ARecord's.");
+                    }
+                    List<List<String>> partitioningExprs = ((InternalDetailsDecl) dd.getDatasetDetailsDecl())
+                            .getPartitioningExprs();
+                    boolean autogenerated = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).isAutogenerated();
+                    ARecordType aRecordType = (ARecordType) itemType;
+                    List<IAType> partitioningTypes = aRecordType.validatePartitioningExpressions(partitioningExprs,
+                            autogenerated);
+
+                    List<String> filterField = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getFilterField();
+                    if (filterField != null) {
+                        aRecordType.validateFilterField(filterField);
+                    }
+                    if (compactionPolicy == null) {
+                        if (filterField != null) {
+                            // If the dataset has a filter and the user didn't specify a merge policy, then we will pick the
+                            // correlated-prefix as the default merge policy.
+                            compactionPolicy = GlobalConfig.DEFAULT_FILTERED_DATASET_COMPACTION_POLICY_NAME;
+                            compactionPolicyProperties = GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES;
+                        }
+                    }
+                    datasetDetails = new InternalDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
+                            InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs,
+                            partitioningTypes, autogenerated, filterField, temp);
+                    break;
+                }
+                case EXTERNAL: {
+                    String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter();
+                    Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties();
+
+                    datasetDetails = new ExternalDatasetDetails(adapter, properties, new Date(),
+                            ExternalDatasetTransactionState.COMMIT);
+                    break;
+                }
+
+            }
+
+            //#. initialize DatasetIdFactory if it is not initialized.
+            if (!DatasetIdFactory.isInitialized()) {
+                DatasetIdFactory.initialize(MetadataManager.INSTANCE.getMostRecentDatasetId());
+            }
+
+            //#. add a new dataset with PendingAddOp
+            dataset = new Dataset(dataverseName, datasetName, itemTypeName, ngName, compactionPolicy,
+                    compactionPolicyProperties, datasetDetails, dd.getHints(), dsType,
+                    DatasetIdFactory.generateDatasetId(), IMetadataEntity.PENDING_ADD_OP);
+            MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset);
+
+            if (dd.getDatasetType() == DatasetType.INTERNAL) {
+                Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
+                        dataverseName);
+                JobSpecification jobSpec = DatasetOperations.createDatasetJobSpec(dataverse, datasetName,
+                        metadataProvider);
+
+                //#. make metadataTxn commit before calling runJob.
+                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                bActiveTxn = false;
+                progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
+
+                //#. runJob
+                runJob(hcc, jobSpec, true);
+
+                //#. begin new metadataTxn
+                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                bActiveTxn = true;
+                metadataProvider.setMetadataTxnContext(mdTxnCtx);
+            }
+
+            //#. add a new dataset with PendingNoOp after deleting the dataset with PendingAddOp
+            MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName);
+            dataset.setPendingOp(IMetadataEntity.PENDING_NO_OP);
+            MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            if (bActiveTxn) {
+                abort(e, e, mdTxnCtx);
+            }
+
+            if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
+
+                //#. execute compensation operations
+                //   remove the index in NC
+                //   [Notice]
+                //   As long as we updated(and committed) metadata, we should remove any effect of the job
+                //   because an exception occurs during runJob.
+                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                bActiveTxn = true;
+                metadataProvider.setMetadataTxnContext(mdTxnCtx);
+                CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
+                try {
+                    JobSpecification jobSpec = DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider);
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                    bActiveTxn = false;
+
+                    runJob(hcc, jobSpec, true);
+                } catch (Exception e2) {
+                    e.addSuppressed(e2);
+                    if (bActiveTxn) {
+                        abort(e, e2, mdTxnCtx);
+                    }
+                }
+
+                //   remove the record from the metadata.
+                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                metadataProvider.setMetadataTxnContext(mdTxnCtx);
+                try {
+                    MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
+                            datasetName);
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                } catch (Exception e2) {
+                    e.addSuppressed(e2);
+                    abort(e, e2, mdTxnCtx);
+                    throw new IllegalStateException("System is inconsistent state: pending dataset(" + dataverseName
+                            + "." + datasetName + ") couldn't be removed from the metadata", e);
+                }
+            }
+
+            throw e;
+        } finally {
+            MetadataLockManager.INSTANCE.createDatasetEnd(dataverseName, dataverseName + "." + itemTypeName,
+                    nodegroupName, compactionPolicy, dataverseName + "." + datasetName, defaultCompactionPolicy);
+        }
+    }
+
+    private void validateIfResourceIsActiveInFeed(String dataverseName, String datasetName) throws AsterixException {
+        List<FeedConnectionId> activeFeedConnections = FeedLifecycleListener.INSTANCE.getActiveFeedConnections(null);
+        boolean resourceInUse = false;
+        StringBuilder builder = new StringBuilder();
+
+        if (activeFeedConnections != null && !activeFeedConnections.isEmpty()) {
+            for (FeedConnectionId connId : activeFeedConnections) {
+                if (connId.getDatasetName().equals(datasetName)) {
+                    resourceInUse = true;
+                    builder.append(connId + "\n");
+                }
+            }
+        }
+
+        if (resourceInUse) {
+            throw new AsterixException("Dataset " + datasetName + " is currently being "
+                    + "fed into by the following feed(s).\n" + builder.toString() + "\n" + "Operation not supported");
+        }
+
+    }
+
+    private String getNodeGroupName(Identifier ngNameId, DatasetDecl dd, String dataverse) {
+        if (ngNameId != null) {
+            return ngNameId.getValue();
+        }
+        String hintValue = dd.getHints().get(DatasetNodegroupCardinalityHint.NAME);
+        if (hintValue == null) {
+            return MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME;
+        } else {
+            return (dataverse + ":" + dd.getName().getValue());
+        }
+    }
+
+    private String configureNodegroupForDataset(DatasetDecl dd, String dataverse, MetadataTransactionContext mdTxnCtx)
+            throws AsterixException {
+        int nodegroupCardinality = -1;
+        String nodegroupName;
+        String hintValue = dd.getHints().get(DatasetNodegroupCardinalityHint.NAME);
+        if (hintValue == null) {
+            nodegroupName = MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME;
+            return nodegroupName;
+        } else {
+            int numChosen = 0;
+            boolean valid = DatasetHints.validate(DatasetNodegroupCardinalityHint.NAME,
+                    dd.getHints().get(DatasetNodegroupCardinalityHint.NAME)).first;
+            if (!valid) {
+                throw new AsterixException("Incorrect use of hint:" + DatasetNodegroupCardinalityHint.NAME);
+            } else {
+                nodegroupCardinality = Integer.parseInt(dd.getHints().get(DatasetNodegroupCardinalityHint.NAME));
+            }
+            Set<String> nodeNames = AsterixAppContextInfo.getInstance().getMetadataProperties().getNodeNames();
+            Set<String> nodeNamesClone = new HashSet<String>();
+            for (String node : nodeNames) {
+                nodeNamesClone.add(node);
+            }
+            String metadataNodeName = AsterixAppContextInfo.getInstance().getMetadataProperties().getMetadataNodeName();
+            List<String> selectedNodes = new ArrayList<String>();
+            selectedNodes.add(metadataNodeName);
+            numChosen++;
+            nodeNamesClone.remove(metadataNodeName);
+
+            if (numChosen < nodegroupCardinality) {
+                Random random = new Random();
+                String[] nodes = nodeNamesClone.toArray(new String[] {});
+                int[] b = new int[nodeNamesClone.size()];
+                for (int i = 0; i < b.length; i++) {
+                    b[i] = i;
+                }
+
+                for (int i = 0; i < nodegroupCardinality - numChosen; i++) {
+                    int selected = i + random.nextInt(nodeNamesClone.size() - i);
+                    int selNodeIndex = b[selected];
+                    selectedNodes.add(nodes[selNodeIndex]);
+                    int temp = b[0];
+                    b[0] = b[selected];
+                    b[selected] = temp;
+                }
+            }
+            nodegroupName = dataverse + ":" + dd.getName().getValue();
+            MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(nodegroupName, selectedNodes));
+            return nodegroupName;
+        }
+
+    }
+
+    @SuppressWarnings("unchecked")
+    private void handleCreateIndexStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+            IHyracksClientConnection hcc) throws Exception {
+        ProgressState progress = ProgressState.NO_PROGRESS;
+        CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
+        String dataverseName = getActiveDataverse(stmtCreateIndex.getDataverseName());
+        String datasetName = stmtCreateIndex.getDatasetName().getValue();
+
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        boolean bActiveTxn = true;
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+        MetadataLockManager.INSTANCE.createIndexBegin(dataverseName, dataverseName + "." + datasetName);
+
+        String indexName = null;
+        JobSpecification spec = null;
+        Dataset ds = null;
+        // For external datasets
+        ArrayList<ExternalFile> externalFilesSnapshot = null;
+        boolean firstExternalDatasetIndex = false;
+        boolean filesIndexReplicated = false;
+        Index filesIndex = null;
+        boolean datasetLocked = false;
+        try {
+            ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
+                    datasetName);
+            if (ds == null) {
+                throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
+                        + dataverseName);
+            }
+
+            indexName = stmtCreateIndex.getIndexName().getValue();
+            Index idx = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
+                    datasetName, indexName);
+
+            String itemTypeName = ds.getItemTypeName();
+            Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), dataverseName,
+                    itemTypeName);
+            IAType itemType = dt.getDatatype();
+            ARecordType aRecordType = (ARecordType) itemType;
+
+            List<List<String>> indexFields = new ArrayList<List<String>>();
+            List<IAType> indexFieldTypes = new ArrayList<IAType>();
+            for (Pair<List<String>, TypeExpression> fieldExpr : stmtCreateIndex.getFieldExprs()) {
+                IAType fieldType = null;
+                boolean isOpen = aRecordType.isOpen();
+                ARecordType subType = aRecordType;
+                int i = 0;
+                if (fieldExpr.first.size() > 1 && !isOpen) {
+                    for (; i < fieldExpr.first.size() - 1;) {
+                        subType = (ARecordType) subType.getFieldType(fieldExpr.first.get(i));
+                        i++;
+                        if (subType.isOpen()) {
+                            isOpen = true;
+                            break;
+                        };
+                    }
+                }
+                if (fieldExpr.second == null) {
+                    fieldType = subType.getSubFieldType(fieldExpr.first.subList(i, fieldExpr.first.size()));
+                } else {
+                    if (!stmtCreateIndex.isEnforced())
+                        throw new AlgebricksException("Cannot create typed index on \"" + fieldExpr.first
+                                + "\" field without enforcing it's type");
+                    if (!isOpen)
+                        throw new AlgebricksException("Typed index on \"" + fieldExpr.first
+                                + "\" field could be created only for open datatype");
+                    Map<TypeSignature, IAType> typeMap = TypeTranslator.computeTypes(mdTxnCtx, fieldExpr.second,
+                            indexName, dataverseName);
+                    TypeSignature typeSignature = new TypeSignature(dataverseName, indexName);
+                    fieldType = typeMap.get(typeSignature);
+                }
+                if (fieldType == null)
+                    throw new AlgebricksException("Unknown type " + fieldExpr.second);
+
+                indexFields.add(fieldExpr.first);
+                indexFieldTypes.add(fieldType);
+            }
+
+            aRecordType.validateKeyFields(indexFields, indexFieldTypes, stmtCreateIndex.getIndexType());
+
+            if (idx != null) {
+                if (stmtCreateIndex.getIfNotExists()) {
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                    return;
+                } else {
+                    throw new AlgebricksException("An index with this name " + indexName + " already exists.");
+                }
+            }
+
+            // Checks whether a user is trying to create an inverted secondary index on a dataset with a variable-length primary key.
+            // Currently, we do not support this. Therefore, as a temporary solution, we print an error message and stop.
+            if (stmtCreateIndex.getIndexType() == IndexType.SINGLE_PARTITION_WORD_INVIX
+                    || stmtCreateIndex.getIndexType() == IndexType.SINGLE_PARTITION_NGRAM_INVIX
+                    || stmtCreateIndex.getIndexType() == IndexType.LENGTH_PARTITIONED_WORD_INVIX
+                    || stmtCreateIndex.getIndexType() == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) {
+                List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(ds);
+                for (List<String> partitioningKey : partitioningKeys) {
+                    IAType keyType = aRecordType.getSubFieldType(partitioningKey);
+                    ITypeTraits typeTrait = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+
+                    // If it is not a fixed length
+                    if (typeTrait.getFixedLength() < 0) {
+                        throw new AlgebricksException("The keyword or ngram index -" + indexName
+                                + " cannot be created on the dataset -" + datasetName
+                                + " due to its variable-length primary key field - " + partitioningKey);
+                    }
+
+                }
+            }
+
+            if (ds.getDatasetType() == DatasetType.INTERNAL) {
+                validateIfResourceIsActiveInFeed(dataverseName, datasetName);
+            } else {
+                // External dataset
+                // Check if the dataset is indexible
+                if (!ExternalIndexingOperations.isIndexible((ExternalDatasetDetails) ds.getDatasetDetails())) {
+                    throw new AlgebricksException("dataset using "
+                            + ((ExternalDatasetDetails) ds.getDatasetDetails()).getAdapter()
+                            + " Adapter can't be indexed");
+                }
+                // check if the name of the index is valid
+                if (!ExternalIndexingOperations.isValidIndexName(datasetName, indexName)) {
+                    throw new AlgebricksException("external dataset index name is invalid");
+                }
+
+                // Check if the files index exist
+                filesIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
+                        datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName));
+                firstExternalDatasetIndex = (filesIndex == null);
+                // lock external dataset
+                ExternalDatasetsRegistry.INSTANCE.buildIndexBegin(ds, firstExternalDatasetIndex);
+                datasetLocked = true;
+                if (firstExternalDatasetIndex) {
+                    // verify that no one has created an index before we acquire the lock
+                    filesIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
+                            dataverseName, datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName));
+                    if (filesIndex != null) {
+                        ExternalDatasetsRegistry.INSTANCE.buildIndexEnd(ds, firstExternalDatasetIndex);
+                        firstExternalDatasetIndex = false;
+                        ExternalDatasetsRegistry.INSTANCE.buildIndexBegin(ds, firstExternalDatasetIndex);
+                    }
+                }
+                if (firstExternalDatasetIndex) {
+                    // Get snapshot from External File System
+                    externalFilesSnapshot = ExternalIndexingOperations.getSnapshotFromExternalFileSystem(ds);
+                    // Add an entry for the files index
+                    filesIndex = new Index(dataverseName, datasetName,
+                            ExternalIndexingOperations.getFilesIndexName(datasetName), IndexType.BTREE,
+                            ExternalIndexingOperations.FILE_INDEX_FIELD_NAMES,
+                            ExternalIndexingOperations.FILE_INDEX_FIELD_TYPES, false, false,
+                            IMetadataEntity.PENDING_ADD_OP);
+                    MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), filesIndex);
+                    // Add files to the external files index
+                    for (ExternalFile file : externalFilesSnapshot) {
+                        MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file);
+                    }
+                    // This is the first index for the external dataset, replicate the files index
+                    spec = ExternalIndexingOperations.buildFilesIndexReplicationJobSpec(ds, externalFilesSnapshot,
+                            metadataProvider, true);
+                    if (spec == null) {
+                        throw new AsterixException(
+                                "Failed to create job spec for replicating Files Index For external dataset");
+                    }
+                    filesIndexReplicated = true;
+                    runJob(hcc, spec, true);
+                }
+            }
+
+            //check whether there exists another enforced index on the same field
+            if (stmtCreateIndex.isEnforced()) {
+                List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(
+                        metadataProvider.getMetadataTxnContext(), dataverseName, datasetName);
+                for (Index index : indexes) {
+                    if (index.getKeyFieldNames().equals(indexFields)
+                            && !index.getKeyFieldTypes().equals(indexFieldTypes) && index.isEnforcingKeyFileds())
+                        throw new AsterixException("Cannot create index " + indexName + " , enforced index "
+                                + index.getIndexName() + " on field \"" + StringUtils.join(indexFields, ',')
+                                + "\" already exist");
+                }
+            }
+
+            //#. add a new index with PendingAddOp
+            Index index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(), indexFields,
+                    indexFieldTypes, stmtCreateIndex.getGramLength(), stmtCreateIndex.isEnforced(), false,
+                    IMetadataEntity.PENDING_ADD_OP);
+            MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
+
+            ARecordType enforcedType = null;
+            if (stmtCreateIndex.isEnforced()) {
+                enforcedType = IntroduceSecondaryIndexInsertDeleteRule.createEnforcedType(aRecordType, index);
+            }
+
+            //#. prepare to create the index artifact in NC.
+            CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName,
+                    index.getDatasetName(), index.getKeyFieldNames(), index.getKeyFieldTypes(),
+                    index.isEnforcingKeyFileds(), index.getGramLength(), index.getIndexType());
+            spec = IndexOperations.buildSecondaryIndexCreationJobSpec(cis, aRecordType, enforcedType, metadataProvider);
+            if (spec == null) {
+                throw new AsterixException("Failed to create job spec for creating index '"
+                        + stmtCreateIndex.getDatasetName() + "." + stmtCreateIndex.getIndexName() + "'");
+            }
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            bActiveTxn = false;
+
+            progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
+
+            //#. create the index artifact in NC.
+            runJob(hcc, spec, true);
+
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            bActiveTxn = true;
+            metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+            //#. load data into the index in NC.
+            cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName, index.getDatasetName(),
+                    index.getKeyFieldNames(), index.getKeyFieldTypes(), index.isEnforcingKeyFileds(),
+                    index.getGramLength(), index.getIndexType());
+            spec = IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, aRecordType, enforcedType, metadataProvider);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            bActiveTxn = false;
+
+            runJob(hcc, spec, true);
+
+            //#. begin new metadataTxn
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            bActiveTxn = true;
+            metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+            //#. add another new index with PendingNoOp after deleting the index with PendingAddOp
+            MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
+                    indexName);
+            index.setPendingOp(IMetadataEntity.PENDING_NO_OP);
+            MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
+            // add another new files index with PendingNoOp after deleting the index with PendingAddOp
+            if (firstExternalDatasetIndex) {
+                MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
+                        datasetName, filesIndex.getIndexName());
+                filesIndex.setPendingOp(IMetadataEntity.PENDING_NO_OP);
+                MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), filesIndex);
+                // update transaction timestamp
+                ((ExternalDatasetDetails) ds.getDatasetDetails()).setRefreshTimestamp(new Date());
+                MetadataManager.INSTANCE.updateDataset(mdTxnCtx, ds);
+            }
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+
+        } catch (Exception e) {
+            if (bActiveTxn) {
+                abort(e, e, mdTxnCtx);
+            }
+            // If files index was replicated for external dataset, it should be cleaned up on NC side
+            if (filesIndexReplicated) {
+                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                bActiveTxn = true;
+                CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
+                        ExternalIndexingOperations.getFilesIndexName(datasetName));
+                try {
+                    JobSpecification jobSpec = ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds,
+                            metadataProvider, ds);
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                    bActiveTxn = false;
+                    runJob(hcc, jobSpec, true);
+                } catch (Exception e2) {
+                    e.addSuppressed(e2);
+                    if (bActiveTxn) {
+                        abort(e, e2, mdTxnCtx);
+                    }
+                }
+            }
+
+            if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
+                //#. execute compensation operations
+                //   remove the index in NC
+                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                bActiveTxn = true;
+                metadataProvider.setMetadataTxnContext(mdTxnCtx);
+                CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
+                try {
+                    JobSpecification jobSpec = IndexOperations
+                            .buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds);
+
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                    bActiveTxn = false;
+                    runJob(hcc, jobSpec, true);
+                } catch (Exception e2) {
+                    e.addSuppressed(e2);
+                    if (bActiveTxn) {
+                        abort(e, e2, mdTxnCtx);
+                    }
+                }
+
+                if (firstExternalDatasetIndex) {
+                    mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                    metadataProvider.setMetadataTxnContext(mdTxnCtx);
+                    try {
+                        // Drop External Files from metadata
+                        MetadataManager.INSTANCE.dropDatasetExternalFiles(mdTxnCtx, ds);
+                        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                    } catch (Exception e2) {
+                        e.addSuppressed(e2);
+                        abort(e, e2, mdTxnCtx);
+                        throw new IllegalStateException("System is inconsistent state: pending files for("
+                                + dataverseName + "." + datasetName + ") couldn't be removed from the metadata", e);
+                    }
+                    mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                    metadataProvider.setMetadataTxnContext(mdTxnCtx);
+                    try {
+                        // Drop the files index from metadata
+                        MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
+                                datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName));
+                        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                    } catch (Exception e2) {
+                        e.addSuppressed(e2);
+                        abort(e, e2, mdTxnCtx);
+                        throw new IllegalStateException("System is inconsistent state: pending index(" + dataverseName
+                                + "." + datasetName + "." + ExternalIndexingOperations.getFilesIndexName(datasetName)
+                                + ") couldn't be removed from the metadata", e);
+                    }
+                }
+                // remove the record from the metadata.
+                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                metadataProvider.setMetadataTxnContext(mdTxnCtx);
+                try {
+                    MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
+                            datasetName, indexName);
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                } catch (Exception e2) {
+                    e.addSuppressed(e2);
+                    abort(e, e2, mdTxnCtx);
+                    throw new IllegalStateException("System is in inconsistent state: pending index(" + dataverseName
+                            + "." + datasetName + "." + indexName + ") couldn't be removed from the metadata", e);
+                }
+            }
+            throw e;
+        } finally {
+            MetadataLockManager.INSTANCE.createIndexEnd(dataverseName, dataverseName + "." + datasetName);
+            if (datasetLocked) {
+                ExternalDatasetsRegistry.INSTANCE.buildIndexEnd(ds, firstExternalDatasetIndex);
+            }
+        }
+    }
+
+    private void handleCreateTypeStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
+        TypeDecl stmtCreateType = (TypeDecl) stmt;
+        String dataverseName = getActiveDataverse(stmtCreateType.getDataverseName());
+        String typeName = stmtCreateType.getIdent().getValue();
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        MetadataLockManager.INSTANCE.createTypeBegin(dataverseName, dataverseName + "." + typeName);
+        try {
+
+            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
+            if (dv == null) {
+                throw new AlgebricksException("Unknown dataverse " + dataverseName);
+            }
+            Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
+            if (dt != null) {
+                if (!stmtCreateType.getIfNotExists()) {
+                    throw new AlgebricksException("A datatype with this name " + typeName + " already exists.");
+                }
+            } else {
+                if (builtinTypeMap.get(typeName) != null) {
+                    throw new AlgebricksException("Cannot redefine builtin type " + typeName + ".");
+                } else {
+                    Map<TypeSignature, IAType> typeMap = TypeTranslator.computeTypes(mdTxnCtx,
+                            stmtCreateType.getTypeDef(), stmtCreateType.getIdent().getValue(), dataverseName);
+                    TypeSignature typeSignature = new TypeSignature(dataverseName, typeName);
+                    IAType type = typeMap.get(typeSignature);
+                    MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new Datatype(dataverseName, typeName, type, false));
+                }
+            }
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            abort(e, e, mdTxnCtx);
+            throw e;
+        } finally {
+            MetadataLockManager.INSTANCE.createTypeEnd(dataverseName, dataverseName + "." + typeName);
+        }
+    }
+
+    private void handleDataverseDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+            IHyracksClientConnection hcc) throws Exception {
+        DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt;
+        String dataverseName = stmtDelete.getDataverseName().getValue();
+
+        ProgressState progress = ProgressState.NO_PROGRESS;
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        boolean bActiveTxn = true;
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        MetadataLockManager.INSTANCE.acquireDataverseWriteLock(dataverseName);
+        List<JobSpecification> jobsToExecute = new ArrayList<JobSpecification>();
+        try {
+
+            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
+            if (dv == null) {
+                if (stmtDelete.getIfExists()) {
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                    return;
+                } else {
+                    throw new AlgebricksException("There is no dataverse with this name " + dataverseName + ".");
+                }
+            }
+
+            //# disconnect all feeds from any datasets in the dataverse.
+            List<FeedConnectionId> activeFeedConnections = FeedLifecycleListener.INSTANCE
+                    .getActiveFeedConnections(null);
+            DisconnectFeedStatement disStmt = null;
+            Identifier dvId = new Identifier(dataverseName);
+            for (FeedConnectionId connection : activeFeedConnections) {
+                FeedId feedId = connection.getFeedId();
+                if (feedId.getDataverse().equals(dataverseName)) {
+                    disStmt = new DisconnectFeedStatement(dvId, new Identifier(feedId.getFeedName()), new Identifier(
+                            connection.getDatasetName()));
+                    try {
+                        handleDisconnectFeedStatement(metadataProvider, disStmt, hcc);
+                        if (LOGGER.isLoggable(Level.INFO)) {
+                            LOGGER.info("Disconnected feed " + feedId.getFeedName() + " from dataset "
+                                    + connection.getDatasetName());
+                        }
+                    } catch (Exception exception) {
+                        if (LOGGER.isLoggable(Level.WARNING)) {
+                            LOGGER.warning("Unable to disconnect feed " + feedId.getFeedName() + " from dataset "
+                                    + connection.getDatasetName() + ". Encountered exception " + exception);
+                        }
+                    }
+                }
+            }
+
+            //#. prepare jobs which will drop corresponding datasets with indexes.
+            List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dataverseName);
+            for (int j = 0; j < datasets.size(); j++) {
+                String datasetName = datasets.get(j).getDatasetName();
+                DatasetType dsType = datasets.get(j).getDatasetType();
+                if (dsType == DatasetType.INTERNAL) {
+                    List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName,
+                            datasetName);
+                    for (int k = 0; k < indexes.size(); k++) {
+                        if (indexes.get(k).isSecondaryIndex()) {
+                            CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
+                                    indexes.get(k).getIndexName());
+                            jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider,
+                                    datasets.get(j)));
+                        }
+                    }
+
+                    CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
+                    jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
+                } else {
+                    // External dataset
+                    List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName,
+                            datasetName);
+                    for (int k = 0; k < indexes.size(); k++) {
+                        if (ExternalIndexingOperations.isFileIndex(indexes.get(k))) {
+                            CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
+                                    indexes.get(k).getIndexName());
+                            jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds,
+                                    metadataProvider, datasets.get(j)));
+                        } else {
+                            CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
+                                    indexes.get(k).getIndexName());
+                            jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider,
+                                    datasets.get(j)));
+                        }
+                    }
+                    ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(datasets.get(j));
+                }
+            }
+            jobsToExecute.add(DataverseOperations.createDropDataverseJobSpec(dv, metadataProvider));
+
+            //#. mark PendingDropOp on the dataverse record by
+            //   first, deleting the dataverse record from the DATAVERSE_DATASET
+            //   second, inserting the dataverse record with the PendingDropOp value into the DATAVERSE_DATASET
+            MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
+            MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dataverseName, dv.getDataFormat(),
+                    IMetadataEntity.PENDING_DROP_OP));
+
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            bActiveTxn = false;
+            progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
+
+            for (JobSpecification jobSpec : jobsToExecute) {
+                runJob(hcc, jobSpec, true);
+            }
+
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            bActiveTxn = true;
+            metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+            //#. finally, delete the dataverse.
+            MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
+            if (activeDefaultDataverse != null && activeDefaultDataverse.getDataverseName() == dataverseName) {
+                activeDefaultDataverse = null;
+            }
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            if (bActiveTxn) {
+                abort(e, e, mdTxnCtx);
+            }
+
+            if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
+                if (activeDefaultDataverse != null && activeDefaultDataverse.getDataverseName() == dataverseName) {
+                    activeDefaultDataverse = null;
+                }
+
+                //#. execute compensation operations
+                //   remove the all indexes in NC
+                try {
+                    for (JobSpecification jobSpec : jobsToExecute) {
+                        runJob(hcc, jobSpec, true);
+                    }
+                } catch (Exception e2) {
+                    //do no throw exception since still the metadata needs to be compensated.
+                    e.addSuppressed(e2);
+                }
+
+                //   remove the record from the metadata.
+                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                try {
+                    MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                } catch (Exception e2) {
+                    e.addSuppressed(e2);
+                    abort(e, e2, mdTxnCtx);
+                    throw new IllegalStateException("System is inconsistent state: pending dataverse(" + dataverseName
+                            + ") couldn't be removed from the metadata", e);
+                }
+            }
+
+            throw e;
+        } finally {
+            MetadataLockManager.INSTANCE.releaseDataverseWriteLock(dataverseName);
+        }
+    }
+
+    private void handleDatasetDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+            IHyracksClientConnection hcc) throws Exception {
+        DropStatement stmtDelete = (DropStatement) stmt;
+        String dataverseName = getActiveDataverse(stmtDelete.getDataverseName());
+        String datasetName = stmtDelete.getDatasetName().getValue();
+
+        ProgressState progress = ProgressState.NO_PROGRESS;
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        boolean bActiveTxn = true;
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+        MetadataLockManager.INSTANCE.dropDatasetBegin(dataverseName, dataverseName + "." + datasetName);
+        List<JobSpecification> jobsToExecute = new ArrayList<JobSpecification>();
+        try {
+
+            Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+            if (ds == null) {
+                if (stmtDelete.getIfExists()) {
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                    return;
+                } else {
+                    throw new AlgebricksException("There is no dataset with this name " + datasetName
+                            + " in dataverse " + dataverseName + ".");
+                }
+            }
+
+            Map<FeedConnectionId, Pair<JobSpecification, Boolean>> disconnectJobList = new HashMap<FeedConnectionId, Pair<JobSpecification, Boolean>>();
+            if (ds.getDatasetType() == DatasetType.INTERNAL) {
+                // prepare job spec(s) that would disconnect any active feeds involving the dataset.
+                List<FeedConnectionId> feedConnections = FeedLifecycleListener.INSTANCE.getActiveFeedConnections(null);
+                if (feedConnections != null && !feedConnections.isEmpty()) {
+                    for (FeedConnectionId connection : feedConnections) {
+                        Pair<JobSpecification, Boolean> p = FeedOperations.buildDisconnectFeedJobSpec(metadataProvider,
+                                connection);
+                        disconnectJobList.put(connection, p);
+                        if (LOGGER.isLoggable(Level.INFO)) {
+                            LOGGER.info("Disconnecting feed " + connection.getFeedId().getFeedName() + " from dataset "
+                                    + datasetName + " as dataset is being dropped");
+                        }
+                    }
+                }
+
+                //#. prepare jobs to drop the datatset and the indexes in NC
+                List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
+                for (int j = 0; j < indexes.size(); j++) {
+                    if (indexes.get(j).isSecondaryIndex()) {
+                        CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
+                                indexes.get(j).getIndexName());
+                        jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds));
+                    }
+                }
+                CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
+                jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
+
+                //#. mark the existing dataset as PendingDropOp
+                MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
+                MetadataManager.INSTANCE.addDataset(
+                        mdTxnCtx,
+                        new Dataset(dataverseName, datasetName, ds.getItemTypeName(), ds.getNodeGroupName(), ds
+                                .getCompactionPolicy(), ds.getCompactionPolicyProperties(), ds.getDatasetDetails(), ds
+                                .getHints(), ds.getDatasetType(), ds.getDatasetId(), IMetadataEntity.PENDING_DROP_OP));
+
+                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                bActiveTxn = false;
+                progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
+
+                //# disconnect the feeds
+                for (Pair<JobSpecification, Boolean> p : disconnectJobList.values()) {
+                    runJob(hcc, p.first, true);
+                }
+
+                //#. run the jobs
+                for (JobSpecification jobSpec : jobsToExecute) {
+                    runJob(hcc, jobSpec, true);
+                }
+
+                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                bActiveTxn = true;
+                metadataProvider.setMetadataTxnContext(mdTxnCtx);
+            } else {
+                // External dataset
+                ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(ds);
+                //#. prepare jobs to drop the datatset and the indexes in NC
+                List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
+                for (int j = 0; j < indexes.size(); j++) {
+                    if (ExternalIndexingOperations.isFileIndex(indexes.get(j))) {
+                        CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
+                                indexes.get(j).getIndexName());
+                        jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds));
+                    } else {
+                        CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
+                                indexes.get(j).getIndexName());
+                        jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds, metadataProvider,
+                                ds));
+                    }
+                }
+
+                //#. mark the existing dataset as PendingDropOp
+                MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
+                MetadataManager.INSTANCE.addDataset(
+                        mdTxnCtx,
+                        new Dataset(dataverseName, datasetName, ds.getItemTypeName(), ds.getNodeGroupName(), ds
+                                .getCompactionPolicy(), ds.getCompactionPolicyProperties(), ds.getDatasetDetails(), ds
+                                .getHints(), ds.getDatasetType(), ds.getDatasetId(), IMetadataEntity.PENDING_DROP_OP));
+
+                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                bActiveTxn = false;
+                progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
+
+                //#. run the jobs
+                for (JobSpecification jobSpec : jobsToExecute) {
+                    runJob(hcc, jobSpec, true);
+                }
+                if (indexes.size() > 0) {
+                    ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(ds);
+                }
+                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                bActiveTxn = true;
+                metadataProvider.setMetadataTxnContext(mdTxnCtx);
+            }
+
+            //#. finally, delete the dataset.
+            MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
+            // Drop the associated nodegroup
+            String nodegroup = ds.getNodeGroupName();
+            if (!nodegroup.equalsIgnoreCase(MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME)) {
+                MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, dataverseName + ":" + datasetName);
+            }
+
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            if (bActiveTxn) {
+                abort(e, e, mdTxnCtx);
+            }
+
+            if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
+                //#. execute compensation operations
+                //   remove the all indexes in NC
+                try {
+                    for (JobSpecification jobSpec : jobsToExecute) {
+                        runJob(hcc, jobSpec, true);
+                    }
+                } catch (Exception e2) {
+                    //do no throw exception since still the metadata needs to be compensated.
+                    e.addSuppressed(e2);
+                }
+
+                //   remove the record from the metadata.
+                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                metadataProvider.setMetadataTxnContext(mdTxnCtx);
+                try {
+                    MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
+                            datasetName);
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                } catch (Exception e2) {
+                    e.addSuppressed(e2);
+                    abort(e, e2, mdTxnCtx);
+                    throw new IllegalStateException("System is inconsistent state: pending dataset(" + dataverseName
+                            + "." + datasetName + ") couldn't be removed from the metadata", e);
+                }
+            }
+
+            throw e;
+        } finally {
+            MetadataLockManager.INSTANCE.dropDatasetEnd(dataverseName, dataverseName + "." + datasetName);
+        }
+    }
+
+    private void handleIndexDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+            IHyracksClientConnection hcc) throws Exception {
+
+        IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
+        String datasetName = stmtIndexDrop.getDatasetName().getValue();
+        String dataverseName = getActiveDataverse(stmtIndexDrop.getDataverseName());
+        ProgressState progress = ProgressState.NO_PROGRESS;
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        boolean bActiveTxn = true;
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+        MetadataLockManager.INSTANCE.dropIndexBegin(dataverseName, dataverseName + "." + datasetName);
+
+        String indexName = null;
+        // For external index
+        boolean dropFilesIndex = false;
+        List<JobSpecification> jobsToExecute = new ArrayList<JobSpecification>();
+        try {
+
+            Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+            if (ds == null) {
+                throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
+                        + dataverseName);
+            }
+
+            List<FeedConnectionId> feedConnections = FeedLifecycleListener.INSTANCE.getActiveFeedConnections(null);
+            boolean resourceInUse = false;
+            if (feedConnections != null && !feedConnections.isEmpty()) {
+                StringBuilder builder = new StringBuilder();
+                for (FeedConnectionId connection : feedConnections) {
+                    if (connection.getDatasetName().equals(datasetName)) {
+                        resourceInUse = true;
+                        builder.append(connection + "\n");
+                    }
+                }
+                if (resourceInUse) {
+                    throw new AsterixException("Dataset" + datasetName
+                            + " is currently being fed into by the following feeds " + "." + builder.toString()
+                            + "\nOperation not supported.");
+                }
+            }
+
+            if (ds.getDatasetType() == DatasetType.INTERNAL) {
+                indexName = stmtIndexDrop.getIndexName().getValue();
+                Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
+                if (index == null) {
+                    if (stmtIndexDrop.getIfExists()) {
+                        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                        return;
+                    } else {
+                        throw new AlgebricksException("There is no index with this name " + indexName + ".");
+                    }
+                }
+                //#. prepare a job to drop the index in NC.
+                CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
+                jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds));
+
+                //#. mark PendingDropOp on the existing index
+                MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
+                MetadataManager.INSTANCE.addIndex(
+                        mdTxnCtx,
+                        new Index(dataverseName, datasetName, indexName, index.getIndexType(),
+                                index.getKeyFieldNames(), index.getKeyFieldTypes(), index.isEnforcingKeyFileds(), index
+                                        .isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
+
+                //#. commit the existing transaction before calling runJob.
+                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                bActiveTxn = false;
+                progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
+
+                for (JobSpecification jobSpec : jobsToExecute) {
+                    runJob(hcc, jobSpec, true);
+                }
+
+                //#. begin a new transaction
+                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                bActiveTxn = true;
+                metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+                //#. finally, delete the existing index
+                MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
+            } else {
+                // External dataset
+                indexName = stmtIndexDrop.getIndexName().getValue();
+                Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
+                if (index == null) {
+                    if (stmtIndexDrop.getIfExists()) {
+                        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                        return;
+                    } else {
+                        throw new AlgebricksException("There is no index with this name " + indexName + ".");
+                    }
+                } else if (ExternalIndexingOperations.isFileIndex(index)) {
+                    throw new AlgebricksException("Dropping a dataset's files index is not allowed.");
+                }
+                //#. prepare a job to drop the index in NC.
+                CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
+                jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds));
+                List<Index> datasetIndexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName,
+                        datasetName);
+                if (datasetIndexes.size() == 2) {
+                    dropFilesIndex = true;
+                    // only one index + the files index, we need to delete both of the indexes
+                    for (Index externalIndex : datasetIndexes) {
+                        if (ExternalIndexingOperations.isFileIndex(externalIndex)) {
+                            cds = new CompiledIndexDropStatement(dataverseName, datasetName,
+                                    externalIndex.getIndexName());
+                            jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds,
+                                    metadataProvider, ds));
+                            //#. mark PendingDropOp on the existing files index
+                            MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName,
+                                    externalIndex.getIndexName());
+                            MetadataManager.INSTANCE.addIndex(
+                                    mdTxnCtx,
+                                    new Index(dataverseName, datasetName, externalIndex.getIndexName(), externalIndex
+                                            .getIndexType(), externalIndex.getKeyFieldNames(),
+                                            index.getKeyFieldTypes(), index.isEnforcingKeyFileds(), externalIndex
+                                                    .isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
+                        }
+                    }
+                }
+
+                //#. mark PendingDropOp on the existing index
+                MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
+                MetadataManager.INSTANCE.addIndex(
+                        mdTxnCtx,
+                        new Index(dataverseName, datasetName, indexName, index.getIndexType(),
+                                index.getKeyFieldNames(), index.getKeyFieldTypes(), index.isEnforcingKeyFileds(), index
+                                        .isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
+
+                //#. commit the existing transaction before calling runJob.
+                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                bActiveTxn = false;
+                progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
+
+                for (JobSpecification jobSpec : jobsToExecute) {
+                    runJob(hcc, jobSpec, true);
+                }
+
+                //#. begin a new transaction
+                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                bActiveTxn = true;
+                metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+                //#. finally, delete the existing index
+                MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
+                if (dropFilesIndex) {
+                    // delete the files index too
+                    MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName,
+                            ExternalIndexingOperations.getFilesIndexName(datasetName));
+                    MetadataManager.INSTANCE.dropDatasetExternalFiles(mdTxnCtx, ds);
+                    ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(ds);
+                }
+            }
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+
+        } catch (Exception e) {
+            if (bActiveTxn) {
+                abort(e, e, mdTxnCtx);
+            }
+
+            if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
+                //#. execute compensation operations
+                //   remove the all indexes in NC
+                try {
+                    for (JobSpecification jobSpec : jobsToExecute) {
+                        runJob(hcc, jobSpec, true);
+                    }
+                } catch (Exception e2) {
+                    //do no throw exception since still the metadata needs to be compensated.
+                    e.addSuppressed(e2);
+                

<TRUNCATED>


Mime
View raw message