Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6A5CA200B77 for ; Sat, 20 Aug 2016 08:15:47 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 69299160AAB; Sat, 20 Aug 2016 06:15:47 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B0A13160AC5 for ; Sat, 20 Aug 2016 08:15:44 +0200 (CEST) Received: (qmail 62730 invoked by uid 500); 20 Aug 2016 06:15:43 -0000 Mailing-List: contact commits-help@asterixdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.apache.org Delivered-To: mailing list commits@asterixdb.apache.org Received: (qmail 62281 invoked by uid 99); 20 Aug 2016 06:15:43 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 20 Aug 2016 06:15:43 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6860BE05E1; Sat, 20 Aug 2016 06:15:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: buyingyi@apache.org To: commits@asterixdb.apache.org Date: Sat, 20 Aug 2016 06:15:52 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [10/16] asterixdb git commit: Add Asterix Extension Manager archived-at: Sat, 20 Aug 2016 06:15:47 -0000 http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java deleted file mode 100644 index d6065fb..0000000 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java +++ /dev/null @@ -1,3107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.aql.translator; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.rmi.RemoteException; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.Random; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.asterix.active.ActiveJobNotificationHandler; -import org.apache.asterix.active.ActivityState; -import org.apache.asterix.active.EntityId; -import org.apache.asterix.active.IActiveEntityEventsListener; -import org.apache.asterix.api.common.APIFramework; -import org.apache.asterix.api.common.SessionConfig; -import org.apache.asterix.app.external.ExternalIndexingOperations; -import org.apache.asterix.app.external.FeedJoint; -import org.apache.asterix.app.external.FeedOperations; -import org.apache.asterix.common.config.AsterixExternalProperties; -import org.apache.asterix.common.config.DatasetConfig.DatasetType; -import org.apache.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState; -import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp; -import org.apache.asterix.common.config.DatasetConfig.IndexType; -import org.apache.asterix.common.config.GlobalConfig; -import org.apache.asterix.common.config.MetadataConstants; -import org.apache.asterix.common.exceptions.ACIDException; -import org.apache.asterix.common.exceptions.AsterixException; -import org.apache.asterix.common.functions.FunctionSignature; -import org.apache.asterix.compiler.provider.ILangCompilationProvider; -import org.apache.asterix.external.api.IAdapterFactory; -import org.apache.asterix.external.feed.api.IFeed; -import org.apache.asterix.external.feed.api.IFeed.FeedType; -import org.apache.asterix.external.feed.api.IFeedJoint; -import org.apache.asterix.external.feed.api.IFeedJoint.FeedJointType; -import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber; -import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber.FeedLifecycleEvent; -import org.apache.asterix.external.feed.management.FeedConnectionId; -import org.apache.asterix.external.feed.management.FeedConnectionRequest; -import org.apache.asterix.external.feed.management.FeedEventsListener; -import org.apache.asterix.external.feed.management.FeedJointKey; -import org.apache.asterix.external.feed.management.FeedLifecycleEventSubscriber; -import org.apache.asterix.external.feed.policy.FeedPolicyAccessor; -import org.apache.asterix.external.feed.watch.FeedActivity.FeedActivityDetails; -import org.apache.asterix.external.feed.watch.FeedConnectJobInfo; -import org.apache.asterix.external.feed.watch.FeedIntakeInfo; -import org.apache.asterix.external.indexing.ExternalFile; -import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType; -import org.apache.asterix.file.DatasetOperations; -import org.apache.asterix.file.DataverseOperations; -import org.apache.asterix.file.IndexOperations; -import org.apache.asterix.formats.nontagged.AqlTypeTraitProvider; -import org.apache.asterix.lang.aql.statement.SubscribeFeedStatement; -import org.apache.asterix.lang.common.base.IRewriterFactory; -import org.apache.asterix.lang.common.base.IStatementRewriter; -import org.apache.asterix.lang.common.base.Statement; -import org.apache.asterix.lang.common.expression.TypeExpression; -import org.apache.asterix.lang.common.statement.CompactStatement; -import org.apache.asterix.lang.common.statement.ConnectFeedStatement; -import org.apache.asterix.lang.common.statement.CreateDataverseStatement; -import org.apache.asterix.lang.common.statement.CreateFeedPolicyStatement; -import org.apache.asterix.lang.common.statement.CreateFeedStatement; -import org.apache.asterix.lang.common.statement.CreateFunctionStatement; -import org.apache.asterix.lang.common.statement.CreateIndexStatement; -import org.apache.asterix.lang.common.statement.CreatePrimaryFeedStatement; -import org.apache.asterix.lang.common.statement.CreateSecondaryFeedStatement; -import org.apache.asterix.lang.common.statement.DatasetDecl; -import org.apache.asterix.lang.common.statement.DataverseDecl; -import org.apache.asterix.lang.common.statement.DataverseDropStatement; -import org.apache.asterix.lang.common.statement.DeleteStatement; -import org.apache.asterix.lang.common.statement.DisconnectFeedStatement; -import org.apache.asterix.lang.common.statement.DropStatement; -import org.apache.asterix.lang.common.statement.ExternalDetailsDecl; -import org.apache.asterix.lang.common.statement.FeedDropStatement; -import org.apache.asterix.lang.common.statement.FeedPolicyDropStatement; -import org.apache.asterix.lang.common.statement.FunctionDecl; -import org.apache.asterix.lang.common.statement.FunctionDropStatement; -import org.apache.asterix.lang.common.statement.IDatasetDetailsDecl; -import org.apache.asterix.lang.common.statement.IndexDropStatement; -import org.apache.asterix.lang.common.statement.InsertStatement; -import org.apache.asterix.lang.common.statement.InternalDetailsDecl; -import org.apache.asterix.lang.common.statement.LoadStatement; -import org.apache.asterix.lang.common.statement.NodeGroupDropStatement; -import org.apache.asterix.lang.common.statement.NodegroupDecl; -import org.apache.asterix.lang.common.statement.Query; -import org.apache.asterix.lang.common.statement.RefreshExternalDatasetStatement; -import org.apache.asterix.lang.common.statement.RunStatement; -import org.apache.asterix.lang.common.statement.SetStatement; -import org.apache.asterix.lang.common.statement.TypeDecl; -import org.apache.asterix.lang.common.statement.TypeDropStatement; -import org.apache.asterix.lang.common.statement.WriteStatement; -import org.apache.asterix.lang.common.struct.Identifier; -import org.apache.asterix.lang.common.util.FunctionUtil; -import org.apache.asterix.metadata.IDatasetDetails; -import org.apache.asterix.metadata.MetadataException; -import org.apache.asterix.metadata.MetadataManager; -import org.apache.asterix.metadata.MetadataTransactionContext; -import org.apache.asterix.metadata.api.IMetadataEntity; -import org.apache.asterix.metadata.dataset.hints.DatasetHints; -import org.apache.asterix.metadata.dataset.hints.DatasetHints.DatasetNodegroupCardinalityHint; -import org.apache.asterix.metadata.declared.AqlMetadataProvider; -import org.apache.asterix.metadata.entities.CompactionPolicy; -import org.apache.asterix.metadata.entities.Dataset; -import org.apache.asterix.metadata.entities.Datatype; -import org.apache.asterix.metadata.entities.Dataverse; -import org.apache.asterix.metadata.entities.ExternalDatasetDetails; -import org.apache.asterix.metadata.entities.Feed; -import org.apache.asterix.metadata.entities.FeedPolicyEntity; -import org.apache.asterix.metadata.entities.Function; -import org.apache.asterix.metadata.entities.Index; -import org.apache.asterix.metadata.entities.InternalDatasetDetails; -import org.apache.asterix.metadata.entities.NodeGroup; -import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies; -import org.apache.asterix.metadata.feeds.FeedMetadataUtil; -import org.apache.asterix.metadata.utils.DatasetUtils; -import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry; -import org.apache.asterix.metadata.utils.KeyFieldTypeUtils; -import org.apache.asterix.metadata.utils.MetadataLockManager; -import org.apache.asterix.om.types.ARecordType; -import org.apache.asterix.om.types.ATypeTag; -import org.apache.asterix.om.types.IAType; -import org.apache.asterix.om.types.TypeSignature; -import org.apache.asterix.om.util.AsterixAppContextInfo; -import org.apache.asterix.om.util.AsterixClusterProperties; -import org.apache.asterix.optimizer.rules.IntroduceSecondaryIndexInsertDeleteRule; -import org.apache.asterix.result.ResultReader; -import org.apache.asterix.result.ResultUtils; -import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory; -import org.apache.asterix.translator.AbstractLangTranslator; -import org.apache.asterix.translator.CompiledStatements.CompiledConnectFeedStatement; -import org.apache.asterix.translator.CompiledStatements.CompiledCreateIndexStatement; -import org.apache.asterix.translator.CompiledStatements.CompiledDatasetDropStatement; -import org.apache.asterix.translator.CompiledStatements.CompiledDeleteStatement; -import org.apache.asterix.translator.CompiledStatements.CompiledIndexCompactStatement; -import org.apache.asterix.translator.CompiledStatements.CompiledIndexDropStatement; -import org.apache.asterix.translator.CompiledStatements.CompiledInsertStatement; -import org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement; -import org.apache.asterix.translator.CompiledStatements.CompiledSubscribeFeedStatement; -import org.apache.asterix.translator.CompiledStatements.CompiledUpsertStatement; -import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement; -import org.apache.asterix.translator.TypeTranslator; -import org.apache.asterix.translator.util.ValidateUtil; -import org.apache.asterix.util.FlushDatasetUtils; -import org.apache.asterix.util.JobUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.algebricks.common.utils.Pair; -import org.apache.hyracks.algebricks.common.utils.Triple; -import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind; -import org.apache.hyracks.algebricks.data.IAWriterFactory; -import org.apache.hyracks.algebricks.data.IResultSerializerFactoryProvider; -import org.apache.hyracks.algebricks.runtime.serializer.ResultSerializerFactoryProvider; -import org.apache.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory; -import org.apache.hyracks.api.client.IHyracksClientConnection; -import org.apache.hyracks.api.dataflow.value.ITypeTraits; -import org.apache.hyracks.api.dataset.IHyracksDataset; -import org.apache.hyracks.api.dataset.ResultSetId; -import org.apache.hyracks.api.io.FileReference; -import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.api.job.JobSpecification; -import org.apache.hyracks.dataflow.std.file.FileSplit; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; -import org.json.JSONArray; -import org.json.JSONException; -import org.json.JSONObject; - -import com.google.common.collect.Lists; - -/* - * Provides functionality for executing a batch of Query statements (queries included) - * sequentially. - */ -public class QueryTranslator extends AbstractLangTranslator { - - private static final Logger LOGGER = Logger.getLogger(QueryTranslator.class.getName()); - - private enum ProgressState { - NO_PROGRESS, - ADDED_PENDINGOP_RECORD_TO_METADATA - } - - public enum ResultDelivery { - SYNC, - ASYNC, - ASYNC_DEFERRED - } - - public static final boolean IS_DEBUG_MODE = false;// true - private final List statements; - private final SessionConfig sessionConfig; - private Dataverse activeDefaultDataverse; - private final List declaredFunctions; - private final APIFramework apiFramework; - private final IRewriterFactory rewriterFactory; - - public QueryTranslator(List aqlStatements, SessionConfig conf, - ILangCompilationProvider compliationProvider) { - this.statements = aqlStatements; - this.sessionConfig = conf; - this.declaredFunctions = getDeclaredFunctions(aqlStatements); - this.apiFramework = new APIFramework(compliationProvider); - this.rewriterFactory = compliationProvider.getRewriterFactory(); - } - - private List getDeclaredFunctions(List statements) { - List functionDecls = new ArrayList<>(); - for (Statement st : statements) { - if (st.getKind() == Statement.Kind.FUNCTION_DECL) { - functionDecls.add((FunctionDecl) st); - } - } - return functionDecls; - } - - /** - * Compiles and submits for execution a list of AQL statements. - * - * @param hcc - * A Hyracks client connection that is used to submit a jobspec to Hyracks. - * @param hdc - * A Hyracks dataset client object that is used to read the results. - * @param resultDelivery - * True if the results should be read asynchronously or false if we should wait for results to be read. - * @return A List containing a QueryResult instance corresponding to each submitted query. - * @throws Exception - */ - public void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery) - throws Exception { - compileAndExecute(hcc, hdc, resultDelivery, new ResultUtils.Stats()); - } - - public void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, - ResultUtils.Stats stats) throws Exception { - int resultSetIdCounter = 0; - FileSplit outputFile = null; - IAWriterFactory writerFactory = PrinterBasedWriterFactory.INSTANCE; - IResultSerializerFactoryProvider resultSerializerFactoryProvider = ResultSerializerFactoryProvider.INSTANCE; - Map config = new HashMap<>(); - /* Since the system runs a large number of threads, when HTTP requests don't return, it becomes difficult to - * find the thread running the request to determine where it has stopped. - * Setting the thread name helps make that easier - */ - String threadName = Thread.currentThread().getName(); - Thread.currentThread().setName(QueryTranslator.class.getSimpleName()); - try { - for (Statement stmt : statements) { - if (sessionConfig.is(SessionConfig.FORMAT_HTML)) { - sessionConfig.out().println(APIFramework.HTML_STATEMENT_SEPARATOR); - } - validateOperation(activeDefaultDataverse, stmt); - rewriteStatement(stmt); // Rewrite the statement's AST. - AqlMetadataProvider metadataProvider = new AqlMetadataProvider(activeDefaultDataverse); - metadataProvider.setWriterFactory(writerFactory); - metadataProvider.setResultSerializerFactoryProvider(resultSerializerFactoryProvider); - metadataProvider.setOutputFile(outputFile); - metadataProvider.setConfig(config); - switch (stmt.getKind()) { - case Statement.Kind.SET: - handleSetStatement(stmt, config); - break; - case Statement.Kind.DATAVERSE_DECL: - activeDefaultDataverse = handleUseDataverseStatement(metadataProvider, stmt); - break; - case Statement.Kind.CREATE_DATAVERSE: - handleCreateDataverseStatement(metadataProvider, stmt); - break; - case Statement.Kind.DATASET_DECL: - handleCreateDatasetStatement(metadataProvider, stmt, hcc); - break; - case Statement.Kind.CREATE_INDEX: - handleCreateIndexStatement(metadataProvider, stmt, hcc); - break; - case Statement.Kind.TYPE_DECL: - handleCreateTypeStatement(metadataProvider, stmt); - break; - case Statement.Kind.NODEGROUP_DECL: - handleCreateNodeGroupStatement(metadataProvider, stmt); - break; - case Statement.Kind.DATAVERSE_DROP: - handleDataverseDropStatement(metadataProvider, stmt, hcc); - break; - case Statement.Kind.DATASET_DROP: - handleDatasetDropStatement(metadataProvider, stmt, hcc); - break; - case Statement.Kind.INDEX_DROP: - handleIndexDropStatement(metadataProvider, stmt, hcc); - break; - case Statement.Kind.TYPE_DROP: - handleTypeDropStatement(metadataProvider, stmt); - break; - case Statement.Kind.NODEGROUP_DROP: - handleNodegroupDropStatement(metadataProvider, stmt); - break; - case Statement.Kind.CREATE_FUNCTION: - handleCreateFunctionStatement(metadataProvider, stmt); - break; - case Statement.Kind.FUNCTION_DROP: - handleFunctionDropStatement(metadataProvider, stmt); - break; - case Statement.Kind.LOAD: - handleLoadStatement(metadataProvider, stmt, hcc); - break; - case Statement.Kind.INSERT: - case Statement.Kind.UPSERT: - handleInsertUpsertStatement(metadataProvider, stmt, hcc); - break; - case Statement.Kind.DELETE: - handleDeleteStatement(metadataProvider, stmt, hcc); - break; - case Statement.Kind.CREATE_PRIMARY_FEED: - case Statement.Kind.CREATE_SECONDARY_FEED: - handleCreateFeedStatement(metadataProvider, stmt, hcc); - break; - case Statement.Kind.DROP_FEED: - handleDropFeedStatement(metadataProvider, stmt, hcc); - break; - case Statement.Kind.DROP_FEED_POLICY: - handleDropFeedPolicyStatement(metadataProvider, stmt, hcc); - break; - case Statement.Kind.CONNECT_FEED: - handleConnectFeedStatement(metadataProvider, stmt, hcc); - break; - case Statement.Kind.DISCONNECT_FEED: - handleDisconnectFeedStatement(metadataProvider, stmt, hcc); - break; - case Statement.Kind.SUBSCRIBE_FEED: - handleSubscribeFeedStatement(metadataProvider, stmt, hcc); - break; - case Statement.Kind.CREATE_FEED_POLICY: - handleCreateFeedPolicyStatement(metadataProvider, stmt, hcc); - break; - case Statement.Kind.QUERY: - metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++)); - metadataProvider.setResultAsyncMode(resultDelivery == ResultDelivery.ASYNC - || resultDelivery == ResultDelivery.ASYNC_DEFERRED); - handleQuery(metadataProvider, (Query) stmt, hcc, hdc, resultDelivery, stats); - break; - case Statement.Kind.COMPACT: - handleCompactStatement(metadataProvider, stmt, hcc); - break; - case Statement.Kind.EXTERNAL_DATASET_REFRESH: - handleExternalDatasetRefreshStatement(metadataProvider, stmt, hcc); - break; - case Statement.Kind.WRITE: - Pair result = handleWriteStatement(stmt); - writerFactory = (result.first != null) ? result.first : writerFactory; - outputFile = result.second; - break; - case Statement.Kind.RUN: - handleRunStatement(metadataProvider, stmt, hcc); - break; - default: - // Default should delegate unknown statement to extension-manager - break; - } - } - } finally { - Thread.currentThread().setName(threadName); - } - } - - private void handleSetStatement(Statement stmt, Map config) { - SetStatement ss = (SetStatement) stmt; - String pname = ss.getPropName(); - String pvalue = ss.getPropValue(); - config.put(pname, pvalue); - } - - private Pair handleWriteStatement(Statement stmt) - throws InstantiationException, IllegalAccessException, ClassNotFoundException { - WriteStatement ws = (WriteStatement) stmt; - File f = new File(ws.getFileName()); - FileSplit outputFile = new FileSplit(ws.getNcName().getValue(), new FileReference(f)); - IAWriterFactory writerFactory = null; - if (ws.getWriterClassName() != null) { - writerFactory = (IAWriterFactory) Class.forName(ws.getWriterClassName()).newInstance(); - } - return new Pair<>(writerFactory, outputFile); - } - - private Dataverse handleUseDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt) - throws Exception { - DataverseDecl dvd = (DataverseDecl) stmt; - String dvName = dvd.getDataverseName().getValue(); - MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - metadataProvider.setMetadataTxnContext(mdTxnCtx); - MetadataLockManager.INSTANCE.acquireDataverseReadLock(dvName); - try { - Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName); - if (dv == null) { - throw new MetadataException("Unknown dataverse " + dvName); - } - MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - return dv; - } catch (Exception e) { - abort(e, e, mdTxnCtx); - throw new MetadataException(e); - } finally { - MetadataLockManager.INSTANCE.releaseDataverseReadLock(dvName); - } - } - - private void handleCreateDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception { - - CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt; - String dvName = stmtCreateDataverse.getDataverseName().getValue(); - MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - metadataProvider.setMetadataTxnContext(mdTxnCtx); - - MetadataLockManager.INSTANCE.acquireDataverseReadLock(dvName); - try { - Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName); - if (dv != null) { - if (stmtCreateDataverse.getIfNotExists()) { - MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - return; - } else { - throw new AlgebricksException("A dataverse with this name " + dvName + " already exists."); - } - } - MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(), - new Dataverse(dvName, stmtCreateDataverse.getFormat(), IMetadataEntity.PENDING_NO_OP)); - MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - } catch (Exception e) { - abort(e, e, mdTxnCtx); - throw e; - } finally { - MetadataLockManager.INSTANCE.releaseDataverseReadLock(dvName); - } - } - - private void validateCompactionPolicy(String compactionPolicy, Map compactionPolicyProperties, - MetadataTransactionContext mdTxnCtx, boolean isExternalDataset) throws AsterixException, Exception { - CompactionPolicy compactionPolicyEntity = MetadataManager.INSTANCE.getCompactionPolicy(mdTxnCtx, - MetadataConstants.METADATA_DATAVERSE_NAME, compactionPolicy); - if (compactionPolicyEntity == null) { - throw new AsterixException("Unknown compaction policy: " + compactionPolicy); - } - String compactionPolicyFactoryClassName = compactionPolicyEntity.getClassName(); - ILSMMergePolicyFactory mergePolicyFactory = - (ILSMMergePolicyFactory) Class.forName(compactionPolicyFactoryClassName).newInstance(); - if (isExternalDataset && mergePolicyFactory.getName().compareTo("correlated-prefix") == 0) { - throw new AsterixException("The correlated-prefix merge policy cannot be used with external dataset."); - } - if (compactionPolicyProperties == null) { - if (mergePolicyFactory.getName().compareTo("no-merge") != 0) { - throw new AsterixException("Compaction policy properties are missing."); - } - } else { - for (Map.Entry entry : compactionPolicyProperties.entrySet()) { - if (!mergePolicyFactory.getPropertiesNames().contains(entry.getKey())) { - throw new AsterixException("Invalid compaction policy property: " + entry.getKey()); - } - } - for (String p : mergePolicyFactory.getPropertiesNames()) { - if (!compactionPolicyProperties.containsKey(p)) { - throw new AsterixException("Missing compaction policy property: " + p); - } - } - } - } - - private void handleCreateDatasetStatement(AqlMetadataProvider metadataProvider, Statement stmt, - IHyracksClientConnection hcc) throws AsterixException, Exception { - - ProgressState progress = ProgressState.NO_PROGRESS; - DatasetDecl dd = (DatasetDecl) stmt; - String dataverseName = getActiveDataverse(dd.getDataverse()); - String datasetName = dd.getName().getValue(); - DatasetType dsType = dd.getDatasetType(); - String itemTypeDataverseName = dd.getItemTypeDataverse().getValue(); - String itemTypeName = dd.getItemTypeName().getValue(); - String metaItemTypeDataverseName = dd.getMetaItemTypeDataverse().getValue(); - String metaItemTypeName = dd.getMetaItemTypeName().getValue(); - Identifier ngNameId = dd.getNodegroupName(); - String nodegroupName = getNodeGroupName(ngNameId, dd, dataverseName); - String compactionPolicy = dd.getCompactionPolicy(); - Map compactionPolicyProperties = dd.getCompactionPolicyProperties(); - boolean defaultCompactionPolicy = (compactionPolicy == null); - boolean temp = dd.getDatasetDetailsDecl().isTemp(); - - MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - boolean bActiveTxn = true; - metadataProvider.setMetadataTxnContext(mdTxnCtx); - - MetadataLockManager.INSTANCE.createDatasetBegin(dataverseName, itemTypeDataverseName, - itemTypeDataverseName + "." + itemTypeName, metaItemTypeDataverseName, - metaItemTypeDataverseName + "." + metaItemTypeName, nodegroupName, compactionPolicy, - dataverseName + "." + datasetName, defaultCompactionPolicy); - Dataset dataset = null; - try { - - IDatasetDetails datasetDetails = null; - Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, - datasetName); - if (ds != null) { - if (dd.getIfNotExists()) { - MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - return; - } else { - throw new AlgebricksException("A dataset with this name " + datasetName + " already exists."); - } - } - Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), - itemTypeDataverseName, itemTypeName); - if (dt == null) { - throw new AlgebricksException(": type " + itemTypeName + " could not be found."); - } - String ngName = - ngNameId != null ? ngNameId.getValue() : configureNodegroupForDataset(dd, dataverseName, mdTxnCtx); - - if (compactionPolicy == null) { - compactionPolicy = GlobalConfig.DEFAULT_COMPACTION_POLICY_NAME; - compactionPolicyProperties = GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES; - } else { - validateCompactionPolicy(compactionPolicy, compactionPolicyProperties, mdTxnCtx, false); - } - switch (dd.getDatasetType()) { - case INTERNAL: - IAType itemType = dt.getDatatype(); - if (itemType.getTypeTag() != ATypeTag.RECORD) { - throw new AlgebricksException("Dataset type has to be a record type."); - } - - IAType metaItemType = null; - if (metaItemTypeDataverseName != null && metaItemTypeName != null) { - metaItemType = metadataProvider.findType(metaItemTypeDataverseName, metaItemTypeName); - } - if (metaItemType != null && metaItemType.getTypeTag() != ATypeTag.RECORD) { - throw new AlgebricksException("Dataset meta type has to be a record type."); - } - ARecordType metaRecType = (ARecordType) metaItemType; - - List> partitioningExprs = - ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getPartitioningExprs(); - List keySourceIndicators = - ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getKeySourceIndicators(); - boolean autogenerated = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).isAutogenerated(); - ARecordType aRecordType = (ARecordType) itemType; - List partitioningTypes = ValidateUtil.validatePartitioningExpressions(aRecordType, - metaRecType, partitioningExprs, keySourceIndicators, autogenerated); - - List filterField = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getFilterField(); - if (filterField != null) { - ValidateUtil.validateFilterField(aRecordType, filterField); - } - if (compactionPolicy == null) { - if (filterField != null) { - // If the dataset has a filter and the user didn't specify a merge - // policy, then we will pick the - // correlated-prefix as the default merge policy. - compactionPolicy = GlobalConfig.DEFAULT_FILTERED_DATASET_COMPACTION_POLICY_NAME; - compactionPolicyProperties = GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES; - } - } - datasetDetails = new InternalDatasetDetails(InternalDatasetDetails.FileStructure.BTREE, - InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs, - keySourceIndicators, partitioningTypes, autogenerated, filterField, temp); - break; - case EXTERNAL: - String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter(); - Map properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties(); - - datasetDetails = new ExternalDatasetDetails(adapter, properties, new Date(), - ExternalDatasetTransactionState.COMMIT); - break; - default: - throw new AsterixException("Unknown datatype " + dd.getDatasetType()); - } - - // #. initialize DatasetIdFactory if it is not initialized. - if (!DatasetIdFactory.isInitialized()) { - DatasetIdFactory.initialize(MetadataManager.INSTANCE.getMostRecentDatasetId()); - } - - // #. add a new dataset with PendingAddOp - dataset = new Dataset(dataverseName, datasetName, itemTypeDataverseName, itemTypeName, - metaItemTypeDataverseName, metaItemTypeName, ngName, compactionPolicy, compactionPolicyProperties, - datasetDetails, dd.getHints(), dsType, DatasetIdFactory.generateDatasetId(), - IMetadataEntity.PENDING_ADD_OP); - MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset); - - if (dd.getDatasetType() == DatasetType.INTERNAL) { - Dataverse dataverse = - MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dataverseName); - JobSpecification jobSpec = - DatasetOperations.createDatasetJobSpec(dataverse, datasetName, metadataProvider); - - // #. make metadataTxn commit before calling runJob. - MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - bActiveTxn = false; - progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA; - - // #. runJob - JobUtils.runJob(hcc, jobSpec, true); - - // #. begin new metadataTxn - mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - bActiveTxn = true; - metadataProvider.setMetadataTxnContext(mdTxnCtx); - } - - // #. add a new dataset with PendingNoOp after deleting the dataset with PendingAddOp - MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName); - dataset.setPendingOp(IMetadataEntity.PENDING_NO_OP); - MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset); - MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - } catch (Exception e) { - if (bActiveTxn) { - abort(e, e, mdTxnCtx); - } - - if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) { - - // #. execute compensation operations - // remove the index in NC - // [Notice] - // As long as we updated(and committed) metadata, we should remove any effect of the job - // because an exception occurs during runJob. - mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - bActiveTxn = true; - metadataProvider.setMetadataTxnContext(mdTxnCtx); - CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName); - try { - JobSpecification jobSpec = DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider); - MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - bActiveTxn = false; - JobUtils.runJob(hcc, jobSpec, true); - } catch (Exception e2) { - e.addSuppressed(e2); - if (bActiveTxn) { - abort(e, e2, mdTxnCtx); - } - } - - // remove the record from the metadata. - mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - metadataProvider.setMetadataTxnContext(mdTxnCtx); - try { - MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName, - datasetName); - MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - } catch (Exception e2) { - e.addSuppressed(e2); - abort(e, e2, mdTxnCtx); - throw new IllegalStateException("System is inconsistent state: pending dataset(" + dataverseName - + "." + datasetName + ") couldn't be removed from the metadata", e); - } - } - - throw e; - } finally { - MetadataLockManager.INSTANCE.createDatasetEnd(dataverseName, itemTypeDataverseName, - itemTypeDataverseName + "." + itemTypeName, metaItemTypeDataverseName, - metaItemTypeDataverseName + "." + metaItemTypeName, nodegroupName, compactionPolicy, - dataverseName + "." + datasetName, defaultCompactionPolicy); - } - } - - private void validateIfResourceIsActiveInFeed(String dataverseName, String datasetName) throws AsterixException { - StringBuilder builder = null; - IActiveEntityEventsListener[] listeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners(); - for (IActiveEntityEventsListener listener : listeners) { - if (listener instanceof FeedEventsListener - && ((FeedEventsListener) listener).isConnectedToDataset(datasetName)) { - if (builder == null) { - builder = new StringBuilder(); - } - builder.append(listener.getEntityId() + "\n"); - } - } - if (builder != null) { - throw new AsterixException("Dataset " + dataverseName + "." + datasetName + " is currently being " - + "fed into by the following feed(s).\n" + builder.toString() + "\n" + "Operation not supported"); - } - } - - private String getNodeGroupName(Identifier ngNameId, DatasetDecl dd, String dataverse) { - if (ngNameId != null) { - return ngNameId.getValue(); - } - String hintValue = dd.getHints().get(DatasetNodegroupCardinalityHint.NAME); - if (hintValue == null) { - return MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME; - } else { - return dataverse + ":" + dd.getName().getValue(); - } - } - - private String configureNodegroupForDataset(DatasetDecl dd, String dataverse, MetadataTransactionContext mdTxnCtx) - throws AsterixException { - int nodegroupCardinality; - String nodegroupName; - String hintValue = dd.getHints().get(DatasetNodegroupCardinalityHint.NAME); - if (hintValue == null) { - nodegroupName = MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME; - return nodegroupName; - } else { - int numChosen = 0; - boolean valid = DatasetHints.validate(DatasetNodegroupCardinalityHint.NAME, - dd.getHints().get(DatasetNodegroupCardinalityHint.NAME)).first; - if (!valid) { - throw new AsterixException("Incorrect use of hint:" + DatasetNodegroupCardinalityHint.NAME); - } else { - nodegroupCardinality = Integer.parseInt(dd.getHints().get(DatasetNodegroupCardinalityHint.NAME)); - } - List nodeNames = AsterixAppContextInfo.getInstance().getMetadataProperties().getNodeNames(); - List nodeNamesClone = new ArrayList(nodeNames); - String metadataNodeName = AsterixAppContextInfo.getInstance().getMetadataProperties().getMetadataNodeName(); - List selectedNodes = new ArrayList(); - selectedNodes.add(metadataNodeName); - numChosen++; - nodeNamesClone.remove(metadataNodeName); - - if (numChosen < nodegroupCardinality) { - Random random = new Random(); - String[] nodes = nodeNamesClone.toArray(new String[] {}); - int[] b = new int[nodeNamesClone.size()]; - for (int i = 0; i < b.length; i++) { - b[i] = i; - } - - for (int i = 0; i < nodegroupCardinality - numChosen; i++) { - int selected = i + random.nextInt(nodeNamesClone.size() - i); - int selNodeIndex = b[selected]; - selectedNodes.add(nodes[selNodeIndex]); - int temp = b[0]; - b[0] = b[selected]; - b[selected] = temp; - } - } - nodegroupName = dataverse + ":" + dd.getName().getValue(); - MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(nodegroupName, selectedNodes)); - return nodegroupName; - } - - } - - private void handleCreateIndexStatement(AqlMetadataProvider metadataProvider, Statement stmt, - IHyracksClientConnection hcc) throws Exception { - ProgressState progress = ProgressState.NO_PROGRESS; - CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt; - String dataverseName = getActiveDataverse(stmtCreateIndex.getDataverseName()); - String datasetName = stmtCreateIndex.getDatasetName().getValue(); - List keySourceIndicators = stmtCreateIndex.getFieldSourceIndicators(); - - MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - boolean bActiveTxn = true; - metadataProvider.setMetadataTxnContext(mdTxnCtx); - - MetadataLockManager.INSTANCE.createIndexBegin(dataverseName, dataverseName + "." + datasetName); - - String indexName = null; - JobSpecification spec = null; - Dataset ds = null; - // For external datasets - ArrayList externalFilesSnapshot = null; - boolean firstExternalDatasetIndex = false; - boolean filesIndexReplicated = false; - Index filesIndex = null; - boolean datasetLocked = false; - try { - ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, - datasetName); - if (ds == null) { - throw new AlgebricksException( - "There is no dataset with this name " + datasetName + " in dataverse " + dataverseName); - } - - indexName = stmtCreateIndex.getIndexName().getValue(); - Index idx = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName, - datasetName, indexName); - Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), - ds.getItemTypeDataverseName(), ds.getItemTypeName()); - ARecordType aRecordType = (ARecordType) dt.getDatatype(); - ARecordType metaRecordType = null; - if (ds.hasMetaPart()) { - Datatype metaDt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), - ds.getMetaItemTypeDataverseName(), ds.getMetaItemTypeName()); - metaRecordType = (ARecordType) metaDt.getDatatype(); - } - - List> indexFields = new ArrayList>(); - List indexFieldTypes = new ArrayList(); - int keyIndex = 0; - for (Pair, TypeExpression> fieldExpr : stmtCreateIndex.getFieldExprs()) { - IAType fieldType = null; - ARecordType subType = - KeyFieldTypeUtils.chooseSource(keySourceIndicators, keyIndex, aRecordType, metaRecordType); - boolean isOpen = subType.isOpen(); - int i = 0; - if (fieldExpr.first.size() > 1 && !isOpen) { - for (; i < fieldExpr.first.size() - 1;) { - subType = (ARecordType) subType.getFieldType(fieldExpr.first.get(i)); - i++; - if (subType.isOpen()) { - isOpen = true; - break; - } ; - } - } - if (fieldExpr.second == null) { - fieldType = subType.getSubFieldType(fieldExpr.first.subList(i, fieldExpr.first.size())); - } else { - if (!stmtCreateIndex.isEnforced()) { - throw new AlgebricksException("Cannot create typed index on \"" + fieldExpr.first - + "\" field without enforcing it's type"); - } - if (!isOpen) { - throw new AlgebricksException("Typed index on \"" + fieldExpr.first - + "\" field could be created only for open datatype"); - } - if (stmtCreateIndex.hasMetaField()) { - throw new AlgebricksException("Typed open index can only be created on the record part"); - } - Map typeMap = - TypeTranslator.computeTypes(mdTxnCtx, fieldExpr.second, indexName, dataverseName); - TypeSignature typeSignature = new TypeSignature(dataverseName, indexName); - fieldType = typeMap.get(typeSignature); - } - if (fieldType == null) { - throw new AlgebricksException( - "Unknown type " + (fieldExpr.second == null ? fieldExpr.first : fieldExpr.second)); - } - - indexFields.add(fieldExpr.first); - indexFieldTypes.add(fieldType); - ++keyIndex; - } - - ValidateUtil.validateKeyFields(aRecordType, metaRecordType, indexFields, keySourceIndicators, - indexFieldTypes, stmtCreateIndex.getIndexType()); - - if (idx != null) { - if (stmtCreateIndex.getIfNotExists()) { - MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - return; - } else { - throw new AlgebricksException("An index with this name " + indexName + " already exists."); - } - } - - // Checks whether a user is trying to create an inverted secondary index on a dataset - // with a variable-length primary key. - // Currently, we do not support this. Therefore, as a temporary solution, we print an - // error message and stop. - if (stmtCreateIndex.getIndexType() == IndexType.SINGLE_PARTITION_WORD_INVIX - || stmtCreateIndex.getIndexType() == IndexType.SINGLE_PARTITION_NGRAM_INVIX - || stmtCreateIndex.getIndexType() == IndexType.LENGTH_PARTITIONED_WORD_INVIX - || stmtCreateIndex.getIndexType() == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) { - List> partitioningKeys = DatasetUtils.getPartitioningKeys(ds); - for (List partitioningKey : partitioningKeys) { - IAType keyType = aRecordType.getSubFieldType(partitioningKey); - ITypeTraits typeTrait = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType); - - // If it is not a fixed length - if (typeTrait.getFixedLength() < 0) { - throw new AlgebricksException("The keyword or ngram index -" + indexName - + " cannot be created on the dataset -" + datasetName - + " due to its variable-length primary key field - " + partitioningKey); - } - - } - } - - if (ds.getDatasetType() == DatasetType.INTERNAL) { - validateIfResourceIsActiveInFeed(dataverseName, datasetName); - } else { - // External dataset - // Check if the dataset is indexible - if (!ExternalIndexingOperations.isIndexible((ExternalDatasetDetails) ds.getDatasetDetails())) { - throw new AlgebricksException( - "dataset using " + ((ExternalDatasetDetails) ds.getDatasetDetails()).getAdapter() - + " Adapter can't be indexed"); - } - // Check if the name of the index is valid - if (!ExternalIndexingOperations.isValidIndexName(datasetName, indexName)) { - throw new AlgebricksException("external dataset index name is invalid"); - } - - // Check if the files index exist - filesIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName, - datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName)); - firstExternalDatasetIndex = (filesIndex == null); - // Lock external dataset - ExternalDatasetsRegistry.INSTANCE.buildIndexBegin(ds, firstExternalDatasetIndex); - datasetLocked = true; - if (firstExternalDatasetIndex) { - // Verify that no one has created an index before we acquire the lock - filesIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), - dataverseName, datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName)); - if (filesIndex != null) { - ExternalDatasetsRegistry.INSTANCE.buildIndexEnd(ds, firstExternalDatasetIndex); - firstExternalDatasetIndex = false; - ExternalDatasetsRegistry.INSTANCE.buildIndexBegin(ds, firstExternalDatasetIndex); - } - } - if (firstExternalDatasetIndex) { - // Get snapshot from External File System - externalFilesSnapshot = ExternalIndexingOperations.getSnapshotFromExternalFileSystem(ds); - // Add an entry for the files index - filesIndex = new Index(dataverseName, datasetName, - ExternalIndexingOperations.getFilesIndexName(datasetName), IndexType.BTREE, - ExternalIndexingOperations.FILE_INDEX_FIELD_NAMES, null, - ExternalIndexingOperations.FILE_INDEX_FIELD_TYPES, false, false, - IMetadataEntity.PENDING_ADD_OP); - MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), filesIndex); - // Add files to the external files index - for (ExternalFile file : externalFilesSnapshot) { - MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file); - } - // This is the first index for the external dataset, replicate the files index - spec = ExternalIndexingOperations.buildFilesIndexReplicationJobSpec(ds, externalFilesSnapshot, - metadataProvider, true); - if (spec == null) { - throw new AsterixException( - "Failed to create job spec for replicating Files Index For external dataset"); - } - filesIndexReplicated = true; - JobUtils.runJob(hcc, spec, true); - } - } - - // check whether there exists another enforced index on the same field - if (stmtCreateIndex.isEnforced()) { - List indexes = MetadataManager.INSTANCE - .getDatasetIndexes(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName); - for (Index index : indexes) { - if (index.getKeyFieldNames().equals(indexFields) - && !index.getKeyFieldTypes().equals(indexFieldTypes) && index.isEnforcingKeyFileds()) { - throw new AsterixException("Cannot create index " + indexName + " , enforced index " - + index.getIndexName() + " on field \"" + StringUtils.join(indexFields, ',') - + "\" is already defined with type \"" + index.getKeyFieldTypes() + "\""); - } - } - } - - // #. add a new index with PendingAddOp - Index index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(), indexFields, - keySourceIndicators, indexFieldTypes, stmtCreateIndex.getGramLength(), stmtCreateIndex.isEnforced(), - false, IMetadataEntity.PENDING_ADD_OP); - MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index); - - ARecordType enforcedType = null; - if (stmtCreateIndex.isEnforced()) { - enforcedType = IntroduceSecondaryIndexInsertDeleteRule.createEnforcedType(aRecordType, - Lists.newArrayList(index)); - } - - // #. prepare to create the index artifact in NC. - CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName, - index.getDatasetName(), index.getKeyFieldNames(), index.getKeyFieldTypes(), - index.isEnforcingKeyFileds(), index.getGramLength(), index.getIndexType()); - spec = IndexOperations.buildSecondaryIndexCreationJobSpec(cis, aRecordType, metaRecordType, - keySourceIndicators, enforcedType, metadataProvider); - if (spec == null) { - throw new AsterixException("Failed to create job spec for creating index '" - + stmtCreateIndex.getDatasetName() + "." + stmtCreateIndex.getIndexName() + "'"); - } - MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - bActiveTxn = false; - - progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA; - - // #. create the index artifact in NC. - JobUtils.runJob(hcc, spec, true); - - mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - bActiveTxn = true; - metadataProvider.setMetadataTxnContext(mdTxnCtx); - - // #. load data into the index in NC. - cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName, index.getDatasetName(), - index.getKeyFieldNames(), index.getKeyFieldTypes(), index.isEnforcingKeyFileds(), - index.getGramLength(), index.getIndexType()); - - spec = IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, aRecordType, metaRecordType, - keySourceIndicators, enforcedType, metadataProvider); - MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - bActiveTxn = false; - - JobUtils.runJob(hcc, spec, true); - - // #. begin new metadataTxn - mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - bActiveTxn = true; - metadataProvider.setMetadataTxnContext(mdTxnCtx); - - // #. add another new index with PendingNoOp after deleting the index with PendingAddOp - MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName, - indexName); - index.setPendingOp(IMetadataEntity.PENDING_NO_OP); - MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index); - // add another new files index with PendingNoOp after deleting the index with - // PendingAddOp - if (firstExternalDatasetIndex) { - MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName, - filesIndex.getIndexName()); - filesIndex.setPendingOp(IMetadataEntity.PENDING_NO_OP); - MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), filesIndex); - // update transaction timestamp - ((ExternalDatasetDetails) ds.getDatasetDetails()).setRefreshTimestamp(new Date()); - MetadataManager.INSTANCE.updateDataset(mdTxnCtx, ds); - } - MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - - } catch (Exception e) { - if (bActiveTxn) { - abort(e, e, mdTxnCtx); - } - // If files index was replicated for external dataset, it should be cleaned up on NC side - if (filesIndexReplicated) { - mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - bActiveTxn = true; - CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, - ExternalIndexingOperations.getFilesIndexName(datasetName)); - try { - JobSpecification jobSpec = - ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds, metadataProvider, ds); - MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - bActiveTxn = false; - JobUtils.runJob(hcc, jobSpec, true); - } catch (Exception e2) { - e.addSuppressed(e2); - if (bActiveTxn) { - abort(e, e2, mdTxnCtx); - } - } - } - - if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) { - // #. execute compensation operations - // remove the index in NC - mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - bActiveTxn = true; - metadataProvider.setMetadataTxnContext(mdTxnCtx); - CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName); - try { - JobSpecification jobSpec = - IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds); - - MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - bActiveTxn = false; - JobUtils.runJob(hcc, jobSpec, true); - } catch (Exception e2) { - e.addSuppressed(e2); - if (bActiveTxn) { - abort(e, e2, mdTxnCtx); - } - } - - if (firstExternalDatasetIndex) { - mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - metadataProvider.setMetadataTxnContext(mdTxnCtx); - try { - // Drop External Files from metadata - MetadataManager.INSTANCE.dropDatasetExternalFiles(mdTxnCtx, ds); - MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - } catch (Exception e2) { - e.addSuppressed(e2); - abort(e, e2, mdTxnCtx); - throw new IllegalStateException("System is inconsistent state: pending files for(" - + dataverseName + "." + datasetName + ") couldn't be removed from the metadata", e); - } - mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - metadataProvider.setMetadataTxnContext(mdTxnCtx); - try { - // Drop the files index from metadata - MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, - datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName)); - MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - } catch (Exception e2) { - e.addSuppressed(e2); - abort(e, e2, mdTxnCtx); - throw new IllegalStateException("System is inconsistent state: pending index(" + dataverseName - + "." + datasetName + "." + ExternalIndexingOperations.getFilesIndexName(datasetName) - + ") couldn't be removed from the metadata", e); - } - } - // remove the record from the metadata. - mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - metadataProvider.setMetadataTxnContext(mdTxnCtx); - try { - MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, - datasetName, indexName); - MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - } catch (Exception e2) { - e.addSuppressed(e2); - abort(e, e2, mdTxnCtx); - throw new IllegalStateException("System is in inconsistent state: pending index(" + dataverseName - + "." + datasetName + "." + indexName + ") couldn't be removed from the metadata", e); - } - } - throw e; - } finally { - MetadataLockManager.INSTANCE.createIndexEnd(dataverseName, dataverseName + "." + datasetName); - if (datasetLocked) { - ExternalDatasetsRegistry.INSTANCE.buildIndexEnd(ds, firstExternalDatasetIndex); - } - } - } - - private void handleCreateTypeStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception { - TypeDecl stmtCreateType = (TypeDecl) stmt; - String dataverseName = getActiveDataverse(stmtCreateType.getDataverseName()); - String typeName = stmtCreateType.getIdent().getValue(); - MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - metadataProvider.setMetadataTxnContext(mdTxnCtx); - MetadataLockManager.INSTANCE.createTypeBegin(dataverseName, dataverseName + "." + typeName); - try { - - Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName); - if (dv == null) { - throw new AlgebricksException("Unknown dataverse " + dataverseName); - } - Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName); - if (dt != null) { - if (!stmtCreateType.getIfNotExists()) { - throw new AlgebricksException("A datatype with this name " + typeName + " already exists."); - } - } else { - if (builtinTypeMap.get(typeName) != null) { - throw new AlgebricksException("Cannot redefine builtin type " + typeName + "."); - } else { - Map typeMap = TypeTranslator.computeTypes(mdTxnCtx, - stmtCreateType.getTypeDef(), stmtCreateType.getIdent().getValue(), dataverseName); - TypeSignature typeSignature = new TypeSignature(dataverseName, typeName); - IAType type = typeMap.get(typeSignature); - MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new Datatype(dataverseName, typeName, type, false)); - } - } - MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - } catch (Exception e) { - abort(e, e, mdTxnCtx); - throw e; - } finally { - MetadataLockManager.INSTANCE.createTypeEnd(dataverseName, dataverseName + "." + typeName); - } - } - - private void handleDataverseDropStatement(AqlMetadataProvider metadataProvider, Statement stmt, - IHyracksClientConnection hcc) throws Exception { - DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt; - String dataverseName = stmtDelete.getDataverseName().getValue(); - - ProgressState progress = ProgressState.NO_PROGRESS; - MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - boolean bActiveTxn = true; - metadataProvider.setMetadataTxnContext(mdTxnCtx); - MetadataLockManager.INSTANCE.acquireDataverseWriteLock(dataverseName); - List jobsToExecute = new ArrayList(); - try { - Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName); - if (dv == null) { - if (stmtDelete.getIfExists()) { - MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - return; - } else { - throw new AlgebricksException("There is no dataverse with this name " + dataverseName + "."); - } - } - // # disconnect all feeds from any datasets in the dataverse. - IActiveEntityEventsListener[] activeListeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners(); - Identifier dvId = new Identifier(dataverseName); - for (IActiveEntityEventsListener listener : activeListeners) { - EntityId activeEntityId = listener.getEntityId(); - if (activeEntityId.getExtensionName().equals(Feed.EXTENSION_NAME) - && activeEntityId.getDataverse().equals(dataverseName)) { - FeedEventsListener feedEventListener = (FeedEventsListener) listener; - FeedConnectionId[] connections = feedEventListener.getConnections(); - for (FeedConnectionId conn : connections) { - disconnectFeedBeforeDelete(dvId, activeEntityId, conn, metadataProvider, hcc); - } - // prepare job to remove feed log storage - jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob( - MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverseName, activeEntityId.getEntityName()))); - } - } - - // #. prepare jobs which will drop corresponding datasets with indexes. - List datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dataverseName); - for (int j = 0; j < datasets.size(); j++) { - String datasetName = datasets.get(j).getDatasetName(); - DatasetType dsType = datasets.get(j).getDatasetType(); - if (dsType == DatasetType.INTERNAL) { - List indexes = - MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName); - for (int k = 0; k < indexes.size(); k++) { - if (indexes.get(k).isSecondaryIndex()) { - CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, - indexes.get(k).getIndexName()); - jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, - datasets.get(j))); - } - } - - CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName); - jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider)); - } else { - // External dataset - List indexes = - MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName); - for (int k = 0; k < indexes.size(); k++) { - if (ExternalIndexingOperations.isFileIndex(indexes.get(k))) { - CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, - indexes.get(k).getIndexName()); - jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds, - metadataProvider, datasets.get(j))); - } else { - CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, - indexes.get(k).getIndexName()); - jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, - datasets.get(j))); - } - } - ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(datasets.get(j)); - } - } - jobsToExecute.add(DataverseOperations.createDropDataverseJobSpec(dv, metadataProvider)); - // #. mark PendingDropOp on the dataverse record by - // first, deleting the dataverse record from the DATAVERSE_DATASET - // second, inserting the dataverse record with the PendingDropOp value into the - // DATAVERSE_DATASET - MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName); - MetadataManager.INSTANCE.addDataverse(mdTxnCtx, - new Dataverse(dataverseName, dv.getDataFormat(), IMetadataEntity.PENDING_DROP_OP)); - - MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - bActiveTxn = false; - progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA; - - for (JobSpecification jobSpec : jobsToExecute) { - JobUtils.runJob(hcc, jobSpec, true); - } - - mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - bActiveTxn = true; - metadataProvider.setMetadataTxnContext(mdTxnCtx); - - // #. finally, delete the dataverse. - MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName); - if (activeDefaultDataverse != null && activeDefaultDataverse.getDataverseName() == dataverseName) { - activeDefaultDataverse = null; - } - MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - } catch (Exception e) { - if (bActiveTxn) { - abort(e, e, mdTxnCtx); - } - - if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) { - if (activeDefaultDataverse != null && activeDefaultDataverse.getDataverseName() == dataverseName) { - activeDefaultDataverse = null; - } - - // #. execute compensation operations - // remove the all indexes in NC - try { - for (JobSpecification jobSpec : jobsToExecute) { - JobUtils.runJob(hcc, jobSpec, true); - } - } catch (Exception e2) { - // do no throw exception since still the metadata needs to be compensated. - e.addSuppressed(e2); - } - - // remove the record from the metadata. - mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - try { - MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName); - MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - } catch (Exception e2) { - e.addSuppressed(e2); - abort(e, e2, mdTxnCtx); - throw new IllegalStateException("System is inconsistent state: pending dataverse(" + dataverseName - + ") couldn't be removed from the metadata", e); - } - } - - throw e; - } finally { - MetadataLockManager.INSTANCE.releaseDataverseWriteLock(dataverseName); - } - } - - private void disconnectFeedBeforeDelete(Identifier dvId, EntityId activeEntityId, FeedConnectionId conn, - AqlMetadataProvider metadataProvider, IHyracksClientConnection hcc) { - DisconnectFeedStatement disStmt = new DisconnectFeedStatement(dvId, - new Identifier(activeEntityId.getEntityName()), new Identifier(conn.getDatasetName())); - try { - handleDisconnectFeedStatement(metadataProvider, disStmt, hcc); - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Disconnected feed " + activeEntityId.getEntityName() + " from dataset " - + conn.getDatasetName()); - } - } catch (Exception exception) { - if (LOGGER.isLoggable(Level.WARNING)) { - LOGGER.warning("Unable to disconnect feed " + activeEntityId.getEntityName() + " from dataset " - + conn.getDatasetName() + ". Encountered exception " + exception); - } - } - } - - private void handleDatasetDropStatement(AqlMetadataProvider metadataProvider, Statement stmt, - IHyracksClientConnection hcc) throws Exception { - DropStatement stmtDelete = (DropStatement) stmt; - String dataverseName = getActiveDataverse(stmtDelete.getDataverseName()); - String datasetName = stmtDelete.getDatasetName().getValue(); - - ProgressState progress = ProgressState.NO_PROGRESS; - MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - boolean bActiveTxn = true; - metadataProvider.setMetadataTxnContext(mdTxnCtx); - - MetadataLockManager.INSTANCE.dropDatasetBegin(dataverseName, dataverseName + "." + datasetName); - List jobsToExecute = new ArrayList(); - try { - - Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName); - if (ds == null) { - if (stmtDelete.getIfExists()) { - MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - return; - } else { - throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse " - + dataverseName + "."); - } - } - - Map> disconnectJobList = new HashMap<>(); - if (ds.getDatasetType() == DatasetType.INTERNAL) { - // prepare job spec(s) that would disconnect any active feeds involving the dataset. - IActiveEntityEventsListener[] feedConnections = - ActiveJobNotificationHandler.INSTANCE.getEventListeners(); - for (IActiveEntityEventsListener conn : feedConnections) { - if (conn.getEntityId().getExtensionName().equals(Feed.EXTENSION_NAME) - && ((FeedEventsListener) conn).isConnectedToDataset(datasetName)) { - FeedConnectionId connectionId = new FeedConnectionId(conn.getEntityId(), datasetName); - Pair p = - FeedOperations.buildDisconnectFeedJobSpec(metadataProvider, connectionId); - disconnectJobList.put(connectionId, p); - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Disconnecting feed " + connectionId.getFeedId().getEntityName() - + " from dataset " + datasetName + " as dataset is being dropped"); - } - // prepare job to remove feed log storage - jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob( - MetadataManager.INSTANCE.getFeed(mdTxnCtx, connectionId.getFeedId().getDataverse(), - connectionId.getFeedId().getEntityName()))); - } - } - - // #. prepare jobs to drop the datatset and the indexes in NC - List indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName); - for (int j = 0; j < indexes.size(); j++) { - if (indexes.get(j).isSecondaryIndex()) { - CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, - indexes.get(j).getIndexName()); - jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds)); - } - } - CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName); - jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider)); - - // #. mark the existing dataset as PendingDropOp - MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName); - MetadataManager.INSTANCE.addDataset(mdTxnCtx, - new Dataset(dataverseName, datasetName, ds.getItemTypeDataverseName(), ds.getItemTypeName(), - ds.getMetaItemTypeDataverseName(), ds.getMetaItemTypeName(), ds.getNodeGroupName(), - ds.getCompactionPolicy(), ds.getCompactionPolicyProperties(), ds.getDatasetDetails(), - ds.getHints(), ds.getDatasetType(), ds.getDatasetId(), - IMetadataEntity.PENDING_DROP_OP)); - - MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - bActiveTxn = false; - progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA; - - // # disconnect the feeds - for (Pair p : disconnectJobList.values()) { - JobUtils.runJob(hcc, p.first, true); - } - - // #. run the jobs - for (JobSpecification jobSpec : jobsToExecute) { - JobUtils.runJob(hcc, jobSpec, true); - } - - mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - bActiveTxn = true; - metadataProvider.setMetadataTxnContext(mdTxnCtx); - } else { - // External dataset - ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(ds); - // #. prepare jobs to drop the datatset and the indexes in NC - List indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName); - for (int j = 0; j < indexes.size(); j++) { - if (ExternalIndexingOperations.isFileIndex(indexes.get(j))) { - CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, - indexes.get(j).getIndexName()); - jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds)); - } else { - CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, - indexes.get(j).getIndexName()); - jobsToExecute - .add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds, metadataProvider, ds)); - } - } - - // #. mark the existing dataset as PendingDropOp - MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName); - MetadataManager.INSTANCE.addDataset(mdTxnCtx, - new Dataset(dataverseName, datasetName, ds.getItemTypeDataverseName(), ds.getItemTypeName(), - ds.getNodeGroupName(), ds.getCompactionPolicy(), ds.getCompactionPolicyProperties(), - ds.getDatasetDetails(), ds.getHints(), ds.getDatasetType(), ds.getDatasetId(), - IMetadataEntity.PENDING_DROP_OP)); - - MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - bActiveTxn = false; - progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA; - - // #. run the jobs - for (JobSpecification jobSpec : jobsToExecute) { - JobUtils.runJob(hcc, jobSpec, true); - } - if (indexes.size() > 0) { - ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(ds); - } - mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - bActiveTxn = true; - metadataProvider.setMetadataTxnContext(mdTxnCtx); - } - - // #. finally, delete the dataset. - MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName); - // Drop the associated nodegroup - String nodegroup = ds.getNodeGroupName(); - if (!nodegroup.equalsIgnoreCase(MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME)) { - MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, dataverseName + ":" + datasetName); - } - - MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - } catch (Exception e) { - if (bActiveTxn) { - abort(e, e, mdTxnCtx); - } - - if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) { - // #. execute compensation operations - // remove the all indexes in NC - try { - for (JobSpecification jobSpec : jobsToExecute) { - JobUtils.runJob(hcc, jobSpec, true); - } - } catch (Exception e2) { - // do no throw exception since still the metadata needs to be compensated. - e.addSuppressed(e2); - } - - // remove the record from the metadata. - mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - metadataProvider.setMetadataTxnContext(mdTxnCtx); - try { - MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName, - datasetName); - MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - } catch (Exception e2) { - e.addSuppressed(e2); - abort(e, e2, mdTxnCtx); - throw new IllegalStateException("System is inconsistent state: pending dataset(" + dataverseName - + "." + datasetName + ") couldn't be removed from the metadata", e); - } - } - - throw e; - } finally { - MetadataLockManager.INSTANCE.dropDatasetEnd(dataverseName, dataverseName + "." + datasetName); - } - } - - private void handleIndexDropStatement(AqlMetadataProvider metadataProvider, Statement stmt, - IHyracksClientConnection hcc) throws Exception { - - IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt; - String datasetName = stmtIndexDrop.getDatasetName().getValue(); - String dataverseName = getActiveDataverse(stmtIndexDrop.getDataverseName()); - ProgressState progress = ProgressState.NO_PROGRESS; - MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - boolean bActiveTxn = true; - metadataProvider.setMetadataTxnContext(mdTxnCtx); - - MetadataLockManager.INSTANCE.dropIndexBegin(dataverseName, dataverseName + "." + datasetName); - - String indexName = null; - // For external index - boolean dropFilesIndex = false; - List jobsToExecute = new ArrayList(); - try { - - Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName); - if (ds == null) { - throw new AlgebricksException( - "There is no dataset with this name " + datasetName + " in dataverse " + dataverseName); - } - IActiveEntityEventsListener[] listeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners(); - StringBuilder builder = null; - for (IActiveEntityEventsListener listener : listeners) { - if (listener.getEntityId().getExtensionName().equals(Feed.EXTENSION_NAME) - && ((FeedEventsListener) listener).isConnectedToDataset(datasetName)) { - if (builder == null) { - builder = new StringBuilder(); - } - builder.append(new FeedConnectionId(listener.getEntityId(), datasetName) + "\n"); - } - } - if (builder != null) { - throw new AsterixException( - "Dataset" + datasetName + " is currently being fed into by the following feeds " + "." - + builder.toString() + "\nOperation not supported."); - } - - if (ds.getDatasetType() == DatasetType.INTERNAL) { - indexName = stmtIndexDrop.getIndexName().getValue(); - Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName); - if (index == null) { - if (stmtIndexDrop.getIfExists()) { - MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - return; - } else { - throw new AlgebricksException("There is no index with this name " + indexName + "."); - } - } - // #. prepare a job to drop the index in NC. - CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName); - jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds)); - - // #. mark PendingDropOp on the existing index - MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName); - MetadataManager.INSTANCE.addIndex(mdTxnCtx, - new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getKeyFieldNames(), - index.getKeyFieldSourceIndicators(), index.getKeyFieldTypes(), - index.isEnforcingKeyFileds(), index.isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP)); - - // #. commit the existing transaction before calling runJob. - MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - bActiveTxn = false; -