Return-Path: X-Original-To: apmail-asterixdb-commits-archive@minotaur.apache.org Delivered-To: apmail-asterixdb-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1A42D18F5E for ; Mon, 22 Feb 2016 22:35:29 +0000 (UTC) Received: (qmail 34997 invoked by uid 500); 22 Feb 2016 22:35:22 -0000 Delivered-To: apmail-asterixdb-commits-archive@asterixdb.apache.org Received: (qmail 34961 invoked by uid 500); 22 Feb 2016 22:35:22 -0000 Mailing-List: contact commits-help@asterixdb.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.incubator.apache.org Delivered-To: mailing list commits@asterixdb.incubator.apache.org Received: (qmail 34952 invoked by uid 99); 22 Feb 2016 22:35:22 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Feb 2016 22:35:22 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 38C00C0E98 for ; Mon, 22 Feb 2016 22:35:22 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.549 X-Spam-Level: X-Spam-Status: No, score=-3.549 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.329] autolearn=disabled Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id mfbwKZkrCTTc for ; Mon, 22 Feb 2016 22:34:57 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with SMTP id 1ED4A60CE1 for ; Mon, 22 Feb 2016 22:34:51 +0000 (UTC) Received: (qmail 29307 invoked by uid 99); 22 Feb 2016 22:34:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Feb 2016 22:34:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 93DCAE0446; Mon, 22 Feb 2016 22:34:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: amoudi@apache.org To: commits@asterixdb.incubator.apache.org Date: Mon, 22 Feb 2016 22:35:11 -0000 Message-Id: In-Reply-To: <599702c4f0254acfa2e3eeee75299be2@git.apache.org> References: <599702c4f0254acfa2e3eeee75299be2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [23/34] incubator-asterixdb git commit: Enabled Feed Tests and Added External Library tests http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-app/src/main/java/org/apache/asterix/feed/FeedTrackingManager.java ---------------------------------------------------------------------- diff --git a/asterix-app/src/main/java/org/apache/asterix/feed/FeedTrackingManager.java b/asterix-app/src/main/java/org/apache/asterix/feed/FeedTrackingManager.java deleted file mode 100644 index a1c6fb9..0000000 --- a/asterix-app/src/main/java/org/apache/asterix/feed/FeedTrackingManager.java +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.feed; - -import java.util.Arrays; -import java.util.BitSet; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.asterix.external.feed.api.IFeedTrackingManager; -import org.apache.asterix.external.feed.management.FeedConnectionId; -import org.apache.asterix.external.feed.message.FeedTupleCommitAckMessage; -import org.apache.asterix.external.feed.message.FeedTupleCommitResponseMessage; -import org.apache.asterix.file.FeedOperations; -import org.apache.hyracks.api.job.JobSpecification; - -public class FeedTrackingManager implements IFeedTrackingManager { - - private static final Logger LOGGER = Logger.getLogger(FeedTrackingManager.class.getName()); - - private final BitSet allOnes; - - private Map> ackHistory; - private Map> maxBaseAcked; - - public FeedTrackingManager() { - byte[] allOneBytes = new byte[128]; - Arrays.fill(allOneBytes, (byte) 0xff); - allOnes = BitSet.valueOf(allOneBytes); - ackHistory = new HashMap>(); - maxBaseAcked = new HashMap>(); - } - - @Override - public synchronized void submitAckReport(FeedTupleCommitAckMessage ackMessage) { - AckId ackId = getAckId(ackMessage); - Map acksForConnection = ackHistory.get(ackMessage.getConnectionId()); - if (acksForConnection == null) { - acksForConnection = new HashMap(); - acksForConnection.put(ackId, BitSet.valueOf(ackMessage.getCommitAcks())); - ackHistory.put(ackMessage.getConnectionId(), acksForConnection); - } - BitSet currentAcks = acksForConnection.get(ackId); - if (currentAcks == null) { - currentAcks = BitSet.valueOf(ackMessage.getCommitAcks()); - acksForConnection.put(ackId, currentAcks); - } else { - currentAcks.or(BitSet.valueOf(ackMessage.getCommitAcks())); - } - if (Arrays.equals(currentAcks.toByteArray(), allOnes.toByteArray())) { - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info(ackMessage.getIntakePartition() + " (" + ackMessage.getBase() + ")" + " is convered"); - } - Map maxBaseAckedForConnection = maxBaseAcked.get(ackMessage.getConnectionId()); - if (maxBaseAckedForConnection == null) { - maxBaseAckedForConnection = new HashMap(); - maxBaseAcked.put(ackMessage.getConnectionId(), maxBaseAckedForConnection); - } - Integer maxBaseAckedValue = maxBaseAckedForConnection.get(ackId); - if (maxBaseAckedValue == null) { - maxBaseAckedValue = ackMessage.getBase(); - maxBaseAckedForConnection.put(ackId, ackMessage.getBase()); - sendCommitResponseMessage(ackMessage.getConnectionId(), ackMessage.getIntakePartition(), - ackMessage.getBase()); - } else if (ackMessage.getBase() == maxBaseAckedValue + 1) { - maxBaseAckedForConnection.put(ackId, ackMessage.getBase()); - sendCommitResponseMessage(ackMessage.getConnectionId(), ackMessage.getIntakePartition(), - ackMessage.getBase()); - } else { - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Ignoring discountiuous acked base " + ackMessage.getBase() + " for " + ackId); - } - } - - } else { - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("AckId " + ackId + " pending number of acks " + (128 * 8 - currentAcks.cardinality())); - } - } - } - - public synchronized void disableTracking(FeedConnectionId connectionId) { - ackHistory.remove(connectionId); - maxBaseAcked.remove(connectionId); - } - - private void sendCommitResponseMessage(FeedConnectionId connectionId, int partition, int base) { - FeedTupleCommitResponseMessage response = new FeedTupleCommitResponseMessage(connectionId, partition, base); - List storageLocations = FeedLifecycleListener.INSTANCE.getStoreLocations(connectionId); - List collectLocations = FeedLifecycleListener.INSTANCE.getCollectLocations(connectionId); - String collectLocation = collectLocations.get(partition); - Set messageDestinations = new HashSet(); - messageDestinations.add(collectLocation); - messageDestinations.addAll(storageLocations); - try { - JobSpecification spec = FeedOperations.buildCommitAckResponseJob(response, messageDestinations); - CentralFeedManager.runJob(spec, false); - } catch (Exception e) { - e.printStackTrace(); - if (LOGGER.isLoggable(Level.WARNING)) { - LOGGER.warning("Unable to send commit response message " + response + " exception " + e.getMessage()); - } - } - } - - private static AckId getAckId(FeedTupleCommitAckMessage ackMessage) { - return new AckId(ackMessage.getConnectionId(), ackMessage.getIntakePartition(), ackMessage.getBase()); - } - - private static class AckId { - private FeedConnectionId connectionId; - private int intakePartition; - private int base; - - public AckId(FeedConnectionId connectionId, int intakePartition, int base) { - this.connectionId = connectionId; - this.intakePartition = intakePartition; - this.base = base; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof AckId)) { - return false; - } - AckId other = (AckId) o; - return other.getConnectionId().equals(connectionId) && other.getIntakePartition() == intakePartition - && other.getBase() == base; - } - - @Override - public String toString() { - return connectionId + "[" + intakePartition + "]" + "(" + base + ")"; - } - - @Override - public int hashCode() { - return toString().hashCode(); - } - - public FeedConnectionId getConnectionId() { - return connectionId; - } - - public int getIntakePartition() { - return intakePartition; - } - - public int getBase() { - return base; - } - - } - - @Override - public void disableAcking(FeedConnectionId connectionId) { - ackHistory.remove(connectionId); - maxBaseAcked.remove(connectionId); - if (LOGGER.isLoggable(Level.WARNING)) { - LOGGER.warning("Acking disabled for " + connectionId); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-app/src/main/java/org/apache/asterix/feed/FeedWorkCollection.java ---------------------------------------------------------------------- diff --git a/asterix-app/src/main/java/org/apache/asterix/feed/FeedWorkCollection.java b/asterix-app/src/main/java/org/apache/asterix/feed/FeedWorkCollection.java deleted file mode 100644 index 9d746c8..0000000 --- a/asterix-app/src/main/java/org/apache/asterix/feed/FeedWorkCollection.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.feed; - -import java.io.PrintWriter; -import java.util.ArrayList; -import java.util.List; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.asterix.api.common.SessionConfig; -import org.apache.asterix.api.common.SessionConfig.OutputFormat; -import org.apache.asterix.aql.translator.QueryTranslator; -import org.apache.asterix.compiler.provider.AqlCompilationProvider; -import org.apache.asterix.compiler.provider.ILangCompilationProvider; -import org.apache.asterix.external.feed.api.IFeedWork; -import org.apache.asterix.external.feed.api.IFeedWorkEventListener; -import org.apache.asterix.external.feed.management.FeedCollectInfo; -import org.apache.asterix.external.feed.management.FeedConnectionRequest; -import org.apache.asterix.external.feed.management.FeedConnectionRequest.ConnectionStatus; -import org.apache.asterix.lang.aql.statement.SubscribeFeedStatement; -import org.apache.asterix.lang.common.base.Statement; -import org.apache.asterix.lang.common.statement.DataverseDecl; -import org.apache.asterix.lang.common.struct.Identifier; -import org.apache.asterix.om.util.AsterixAppContextInfo; -import org.apache.hyracks.api.job.JobId; - -/** - * A collection of feed management related task, each represented as an implementation of {@code IFeedWork}. - */ -public class FeedWorkCollection { - - private static Logger LOGGER = Logger.getLogger(FeedWorkCollection.class.getName()); - private static final ILangCompilationProvider compilationProvider = new AqlCompilationProvider(); - - /** - * The task of subscribing to a feed to obtain data. - */ - public static class SubscribeFeedWork implements IFeedWork { - - private final Runnable runnable; - - private final FeedConnectionRequest request; - - @Override - public Runnable getRunnable() { - return runnable; - } - - public SubscribeFeedWork(String[] locations, FeedConnectionRequest request) { - this.runnable = new SubscribeFeedWorkRunnable(locations, request); - this.request = request; - } - - private static class SubscribeFeedWorkRunnable implements Runnable { - - private final FeedConnectionRequest request; - private final String[] locations; - - public SubscribeFeedWorkRunnable(String[] locations, FeedConnectionRequest request) { - this.request = request; - this.locations = locations; - } - - @Override - public void run() { - try { - PrintWriter writer = new PrintWriter(System.out, true); - SessionConfig pc = new SessionConfig(writer, OutputFormat.ADM); - DataverseDecl dataverseDecl = new DataverseDecl( - new Identifier(request.getReceivingFeedId().getDataverse())); - SubscribeFeedStatement subscribeStmt = new SubscribeFeedStatement(locations, request); - List statements = new ArrayList(); - statements.add(dataverseDecl); - statements.add(subscribeStmt); - QueryTranslator translator = new QueryTranslator(statements, pc, compilationProvider); - translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null, - QueryTranslator.ResultDelivery.SYNC); - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Submitted connection requests for execution: " + request); - } - } catch (Exception e) { - if (LOGGER.isLoggable(Level.SEVERE)) { - LOGGER.severe("Exception in executing " + request); - } - throw new RuntimeException(e); - } - } - } - - public static class FeedSubscribeWorkEventListener implements IFeedWorkEventListener { - - @Override - public void workFailed(IFeedWork work, Exception e) { - if (LOGGER.isLoggable(Level.WARNING)) { - LOGGER.warning(" Feed subscription request " + ((SubscribeFeedWork) work).request - + " failed with exception " + e); - } - } - - @Override - public void workCompleted(IFeedWork work) { - ((SubscribeFeedWork) work).request.setSubscriptionStatus(ConnectionStatus.ACTIVE); - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.warning(" Feed subscription request " + ((SubscribeFeedWork) work).request + " completed "); - } - } - - } - - public FeedConnectionRequest getRequest() { - return request; - } - - @Override - public String toString() { - return "SubscribeFeedWork for [" + request + "]"; - } - - } - - /** - * The task of activating a set of feeds. - */ - public static class ActivateFeedWork implements IFeedWork { - - private final Runnable runnable; - - @Override - public Runnable getRunnable() { - return runnable; - } - - public ActivateFeedWork(List feedsToRevive) { - this.runnable = new FeedsActivateRunnable(feedsToRevive); - } - - public ActivateFeedWork() { - this.runnable = new FeedsActivateRunnable(); - } - - private static class FeedsActivateRunnable implements Runnable { - - private List feedsToRevive; - private Mode mode; - - public enum Mode { - REVIVAL_POST_NODE_REJOIN - } - - public FeedsActivateRunnable(List feedsToRevive) { - this.feedsToRevive = feedsToRevive; - } - - public FeedsActivateRunnable() { - } - - @Override - public void run() { - switch (mode) { - case REVIVAL_POST_NODE_REJOIN: - try { - Thread.sleep(10000); - } catch (InterruptedException e1) { - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Attempt to resume feed interrupted"); - } - throw new IllegalStateException(e1.getMessage()); - } - for (FeedCollectInfo finfo : feedsToRevive) { - try { - JobId jobId = AsterixAppContextInfo.getInstance().getHcc().startJob(finfo.jobSpec); - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Resumed feed :" + finfo.feedConnectionId + " job id " + jobId); - LOGGER.info("Job:" + finfo.jobSpec); - } - } catch (Exception e) { - if (LOGGER.isLoggable(Level.WARNING)) { - LOGGER.warning( - "Unable to resume feed " + finfo.feedConnectionId + " " + e.getMessage()); - } - } - } - } - } - - } - - } -} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-app/src/main/java/org/apache/asterix/feed/FeedWorkRequestResponseHandler.java ---------------------------------------------------------------------- diff --git a/asterix-app/src/main/java/org/apache/asterix/feed/FeedWorkRequestResponseHandler.java b/asterix-app/src/main/java/org/apache/asterix/feed/FeedWorkRequestResponseHandler.java deleted file mode 100644 index b30d8a7..0000000 --- a/asterix-app/src/main/java/org/apache/asterix/feed/FeedWorkRequestResponseHandler.java +++ /dev/null @@ -1,269 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.feed; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.asterix.common.api.IClusterManagementWork; -import org.apache.asterix.common.api.IClusterManagementWorkResponse; -import org.apache.asterix.external.feed.watch.FeedConnectJobInfo; -import org.apache.asterix.external.feed.watch.FeedIntakeInfo; -import org.apache.asterix.external.feed.watch.FeedJobInfo; -import org.apache.asterix.metadata.cluster.AddNodeWork; -import org.apache.asterix.metadata.cluster.AddNodeWorkResponse; -import org.apache.asterix.om.util.AsterixAppContextInfo; -import org.apache.asterix.om.util.AsterixClusterProperties; -import org.apache.hyracks.api.client.IHyracksClientConnection; -import org.apache.hyracks.api.constraints.Constraint; -import org.apache.hyracks.api.constraints.PartitionConstraintHelper; -import org.apache.hyracks.api.constraints.expressions.ConstantExpression; -import org.apache.hyracks.api.constraints.expressions.ConstraintExpression; -import org.apache.hyracks.api.constraints.expressions.ConstraintExpression.ExpressionTag; -import org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression; -import org.apache.hyracks.api.constraints.expressions.PartitionCountExpression; -import org.apache.hyracks.api.constraints.expressions.PartitionLocationExpression; -import org.apache.hyracks.api.dataflow.IOperatorDescriptor; -import org.apache.hyracks.api.dataflow.OperatorDescriptorId; -import org.apache.hyracks.api.job.JobSpecification; - -public class FeedWorkRequestResponseHandler implements Runnable { - - private static final Logger LOGGER = Logger.getLogger(FeedWorkRequestResponseHandler.class.getName()); - - private final LinkedBlockingQueue inbox; - - private Map>> feedsWaitingForResponse = new HashMap>>(); - - public FeedWorkRequestResponseHandler(LinkedBlockingQueue inbox) { - this.inbox = inbox; - } - - @Override - public void run() { - while (true) { - IClusterManagementWorkResponse response = null; - try { - response = inbox.take(); - } catch (InterruptedException e) { - if (LOGGER.isLoggable(Level.WARNING)) { - LOGGER.warning("Interrupted exception " + e.getMessage()); - } - } - IClusterManagementWork submittedWork = response.getWork(); - Map nodeSubstitution = new HashMap(); - switch (submittedWork.getClusterManagementWorkType()) { - case ADD_NODE: - AddNodeWork addNodeWork = (AddNodeWork) submittedWork; - int workId = addNodeWork.getWorkId(); - Map> failureAnalysis = feedsWaitingForResponse.get(workId); - AddNodeWorkResponse resp = (AddNodeWorkResponse) response; - List nodesAdded = resp.getNodesAdded(); - List unsubstitutedNodes = new ArrayList(); - unsubstitutedNodes.addAll(addNodeWork.getDeadNodes()); - int nodeIndex = 0; - - /** form a mapping between the failed node and its substitute **/ - if (nodesAdded != null && nodesAdded.size() > 0) { - for (String failedNodeId : addNodeWork.getDeadNodes()) { - String substitute = nodesAdded.get(nodeIndex); - nodeSubstitution.put(failedNodeId, substitute); - nodeIndex = (nodeIndex + 1) % nodesAdded.size(); - unsubstitutedNodes.remove(failedNodeId); - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Node " + substitute + " chosen to substiute lost node " + failedNodeId); - } - } - } - if (unsubstitutedNodes.size() > 0) { - String[] participantNodes = AsterixClusterProperties.INSTANCE.getParticipantNodes() - .toArray(new String[] {}); - nodeIndex = 0; - for (String unsubstitutedNode : unsubstitutedNodes) { - nodeSubstitution.put(unsubstitutedNode, participantNodes[nodeIndex]); - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Node " + participantNodes[nodeIndex] + " chosen to substiute lost node " - + unsubstitutedNode); - } - nodeIndex = (nodeIndex + 1) % participantNodes.length; - } - - if (LOGGER.isLoggable(Level.WARNING)) { - LOGGER.warning("Request " + resp.getWork() + " completed using internal nodes"); - } - } - - // alter failed feed intake jobs - - for (Entry> entry : failureAnalysis.entrySet()) { - String failedNode = entry.getKey(); - List impactedJobInfos = entry.getValue(); - for (FeedJobInfo info : impactedJobInfos) { - JobSpecification spec = info.getSpec(); - replaceNode(spec, failedNode, nodeSubstitution.get(failedNode)); - info.setSpec(spec); - } - } - - Set revisedIntakeJobs = new HashSet(); - Set revisedConnectJobInfos = new HashSet(); - - for (List infos : failureAnalysis.values()) { - for (FeedJobInfo info : infos) { - switch (info.getJobType()) { - case INTAKE: - revisedIntakeJobs.add((FeedIntakeInfo) info); - break; - case FEED_CONNECT: - revisedConnectJobInfos.add((FeedConnectJobInfo) info); - break; - } - } - } - - IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc(); - try { - for (FeedIntakeInfo info : revisedIntakeJobs) { - hcc.startJob(info.getSpec()); - } - Thread.sleep(2000); - for (FeedConnectJobInfo info : revisedConnectJobInfos) { - hcc.startJob(info.getSpec()); - Thread.sleep(2000); - } - } catch (Exception e) { - if (LOGGER.isLoggable(Level.WARNING)) { - LOGGER.warning("Unable to start revised job post failure"); - } - } - - break; - case REMOVE_NODE: - throw new IllegalStateException("Invalid work submitted"); - } - } - } - - private void replaceNode(JobSpecification jobSpec, String failedNodeId, String replacementNode) { - Set userConstraints = jobSpec.getUserConstraints(); - List locationConstraintsToReplace = new ArrayList(); - List countConstraintsToReplace = new ArrayList(); - List modifiedOperators = new ArrayList(); - Map> candidateConstraints = new HashMap>(); - Map> newConstraints = new HashMap>(); - OperatorDescriptorId opId = null; - for (Constraint constraint : userConstraints) { - LValueConstraintExpression lexpr = constraint.getLValue(); - ConstraintExpression cexpr = constraint.getRValue(); - switch (lexpr.getTag()) { - case PARTITION_COUNT: - opId = ((PartitionCountExpression) lexpr).getOperatorDescriptorId(); - if (modifiedOperators.contains(opId)) { - countConstraintsToReplace.add(constraint); - } else { - List clist = candidateConstraints.get(opId); - if (clist == null) { - clist = new ArrayList(); - candidateConstraints.put(opId, clist); - } - clist.add(constraint); - } - break; - case PARTITION_LOCATION: - opId = ((PartitionLocationExpression) lexpr).getOperatorDescriptorId(); - String oldLocation = (String) ((ConstantExpression) cexpr).getValue(); - if (oldLocation.equals(failedNodeId)) { - locationConstraintsToReplace.add(constraint); - modifiedOperators.add(((PartitionLocationExpression) lexpr).getOperatorDescriptorId()); - Map newLocs = newConstraints.get(opId); - if (newLocs == null) { - newLocs = new HashMap(); - newConstraints.put(opId, newLocs); - } - int partition = ((PartitionLocationExpression) lexpr).getPartition(); - newLocs.put(partition, replacementNode); - } else { - if (modifiedOperators.contains(opId)) { - locationConstraintsToReplace.add(constraint); - Map newLocs = newConstraints.get(opId); - if (newLocs == null) { - newLocs = new HashMap(); - newConstraints.put(opId, newLocs); - } - int partition = ((PartitionLocationExpression) lexpr).getPartition(); - newLocs.put(partition, oldLocation); - } else { - List clist = candidateConstraints.get(opId); - if (clist == null) { - clist = new ArrayList(); - candidateConstraints.put(opId, clist); - } - clist.add(constraint); - } - } - break; - default: - break; - } - } - - jobSpec.getUserConstraints().removeAll(locationConstraintsToReplace); - jobSpec.getUserConstraints().removeAll(countConstraintsToReplace); - - for (OperatorDescriptorId mopId : modifiedOperators) { - List clist = candidateConstraints.get(mopId); - if (clist != null && !clist.isEmpty()) { - jobSpec.getUserConstraints().removeAll(clist); - - for (Constraint c : clist) { - if (c.getLValue().getTag().equals(ExpressionTag.PARTITION_LOCATION)) { - ConstraintExpression cexpr = c.getRValue(); - int partition = ((PartitionLocationExpression) c.getLValue()).getPartition(); - String oldLocation = (String) ((ConstantExpression) cexpr).getValue(); - newConstraints.get(mopId).put(partition, oldLocation); - } - } - } - } - - for (Entry> entry : newConstraints.entrySet()) { - OperatorDescriptorId nopId = entry.getKey(); - Map clist = entry.getValue(); - IOperatorDescriptor op = jobSpec.getOperatorMap().get(nopId); - String[] locations = new String[clist.size()]; - for (int i = 0; i < locations.length; i++) { - locations[i] = clist.get(i); - } - PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, op, locations); - } - - } - - public void registerFeedWork(int workId, Map> impactedJobs) { - feedsWaitingForResponse.put(workId, impactedJobs); - } -} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-app/src/main/java/org/apache/asterix/feed/FeedsActivator.java ---------------------------------------------------------------------- diff --git a/asterix-app/src/main/java/org/apache/asterix/feed/FeedsActivator.java b/asterix-app/src/main/java/org/apache/asterix/feed/FeedsActivator.java deleted file mode 100644 index dc02a53..0000000 --- a/asterix-app/src/main/java/org/apache/asterix/feed/FeedsActivator.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.feed; - -import java.io.PrintWriter; -import java.util.ArrayList; -import java.util.List; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.asterix.api.common.SessionConfig; -import org.apache.asterix.api.common.SessionConfig.OutputFormat; -import org.apache.asterix.aql.translator.QueryTranslator; -import org.apache.asterix.compiler.provider.AqlCompilationProvider; -import org.apache.asterix.compiler.provider.ILangCompilationProvider; -import org.apache.asterix.external.feed.management.FeedCollectInfo; -import org.apache.asterix.lang.common.base.Statement; -import org.apache.asterix.lang.common.statement.ConnectFeedStatement; -import org.apache.asterix.lang.common.statement.DataverseDecl; -import org.apache.asterix.lang.common.struct.Identifier; -import org.apache.asterix.om.util.AsterixAppContextInfo; -import org.apache.hyracks.api.job.JobId; - -public class FeedsActivator implements Runnable { - - private static final Logger LOGGER = Logger.getLogger(FeedJobNotificationHandler.class.getName()); - private static final ILangCompilationProvider compilationProvider = new AqlCompilationProvider(); - - private List feedsToRevive; - private Mode mode; - - public enum Mode { - REVIVAL_POST_CLUSTER_REBOOT, - REVIVAL_POST_NODE_REJOIN - } - - public FeedsActivator() { - this.mode = Mode.REVIVAL_POST_CLUSTER_REBOOT; - } - - public FeedsActivator(List feedsToRevive) { - this.feedsToRevive = feedsToRevive; - this.mode = Mode.REVIVAL_POST_NODE_REJOIN; - } - - @Override - public void run() { - switch (mode) { - case REVIVAL_POST_CLUSTER_REBOOT: - //revivePostClusterReboot(); - break; - case REVIVAL_POST_NODE_REJOIN: - try { - Thread.sleep(10000); - } catch (InterruptedException e1) { - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Attempt to resume feed interrupted"); - } - throw new IllegalStateException(e1.getMessage()); - } - for (FeedCollectInfo finfo : feedsToRevive) { - try { - JobId jobId = AsterixAppContextInfo.getInstance().getHcc().startJob(finfo.jobSpec); - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Resumed feed :" + finfo.feedConnectionId + " job id " + jobId); - LOGGER.info("Job:" + finfo.jobSpec); - } - } catch (Exception e) { - if (LOGGER.isLoggable(Level.WARNING)) { - LOGGER.warning("Unable to resume feed " + finfo.feedConnectionId + " " + e.getMessage()); - } - } - } - } - } - - public void reviveFeed(String dataverse, String feedName, String dataset, String feedPolicy) { - PrintWriter writer = new PrintWriter(System.out, true); - SessionConfig pc = new SessionConfig(writer, OutputFormat.ADM); - try { - DataverseDecl dataverseDecl = new DataverseDecl(new Identifier(dataverse)); - ConnectFeedStatement stmt = new ConnectFeedStatement(new Identifier(dataverse), new Identifier(feedName), - new Identifier(dataset), feedPolicy, 0); - stmt.setForceConnect(true); - List statements = new ArrayList(); - statements.add(dataverseDecl); - statements.add(stmt); - QueryTranslator translator = new QueryTranslator(statements, pc, compilationProvider); - translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null, - QueryTranslator.ResultDelivery.SYNC); - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Resumed feed: " + dataverse + ":" + dataset + " using policy " + feedPolicy); - } - } catch (Exception e) { - e.printStackTrace(); - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Exception in resuming loser feed: " + dataverse + ":" + dataset + " using policy " - + feedPolicy + " Exception " + e.getMessage()); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-app/src/main/java/org/apache/asterix/file/ExternalIndexingOperations.java ---------------------------------------------------------------------- diff --git a/asterix-app/src/main/java/org/apache/asterix/file/ExternalIndexingOperations.java b/asterix-app/src/main/java/org/apache/asterix/file/ExternalIndexingOperations.java deleted file mode 100644 index 77c6a54..0000000 --- a/asterix-app/src/main/java/org/apache/asterix/file/ExternalIndexingOperations.java +++ /dev/null @@ -1,760 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.file; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Date; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -import org.apache.asterix.common.api.ILocalResourceMetadata; -import org.apache.asterix.common.config.AsterixStorageProperties; -import org.apache.asterix.common.config.DatasetConfig.DatasetType; -import org.apache.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState; -import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp; -import org.apache.asterix.common.config.DatasetConfig.IndexType; -import org.apache.asterix.common.config.IAsterixPropertiesProvider; -import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider; -import org.apache.asterix.common.exceptions.AsterixException; -import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory; -import org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory; -import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory; -import org.apache.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory; -import org.apache.asterix.external.api.IAdapterFactory; -import org.apache.asterix.external.indexing.ExternalFile; -import org.apache.asterix.external.indexing.FilesIndexDescription; -import org.apache.asterix.external.indexing.IndexingConstants; -import org.apache.asterix.external.operators.ExternalDataScanOperatorDescriptor; -import org.apache.asterix.external.operators.ExternalDatasetIndexesAbortOperatorDescriptor; -import org.apache.asterix.external.operators.ExternalDatasetIndexesCommitOperatorDescriptor; -import org.apache.asterix.external.operators.ExternalDatasetIndexesRecoverOperatorDescriptor; -import org.apache.asterix.external.operators.ExternalFilesIndexOperatorDescriptor; -import org.apache.asterix.external.operators.IndexInfoOperatorDescriptor; -import org.apache.asterix.external.provider.AdapterFactoryProvider; -import org.apache.asterix.external.util.ExternalDataConstants; -import org.apache.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider; -import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider; -import org.apache.asterix.formats.nontagged.AqlTypeTraitProvider; -import org.apache.asterix.metadata.MetadataException; -import org.apache.asterix.metadata.MetadataManager; -import org.apache.asterix.metadata.declared.AqlMetadataProvider; -import org.apache.asterix.metadata.entities.Dataset; -import org.apache.asterix.metadata.entities.ExternalDatasetDetails; -import org.apache.asterix.metadata.entities.Index; -import org.apache.asterix.metadata.utils.DatasetUtils; -import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry; -import org.apache.asterix.om.types.ARecordType; -import org.apache.asterix.om.types.ATypeTag; -import org.apache.asterix.om.types.BuiltinType; -import org.apache.asterix.om.types.IAType; -import org.apache.asterix.om.util.AsterixAppContextInfo; -import org.apache.asterix.om.util.NonTaggedFormatUtil; -import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider; -import org.apache.asterix.transaction.management.resource.ExternalBTreeLocalResourceMetadata; -import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider; -import org.apache.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider; -import org.apache.asterix.translator.CompiledStatements.CompiledCreateIndexStatement; -import org.apache.asterix.translator.CompiledStatements.CompiledIndexDropStatement; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper; -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.algebricks.common.utils.Pair; -import org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy; -import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; -import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; -import org.apache.hyracks.api.dataflow.value.ITypeTraits; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.job.JobSpecification; -import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; -import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory; -import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor; -import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory; -import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeDataflowHelperFactory; -import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeWithBuddyDataflowHelperFactory; -import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; -import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor; -import org.apache.hyracks.storage.am.lsm.rtree.dataflow.ExternalRTreeDataflowHelperFactory; -import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType; -import org.apache.hyracks.storage.common.file.LocalResource; - -public class ExternalIndexingOperations { - - public static final List> FILE_INDEX_FIELD_NAMES = new ArrayList>(); - public static final ArrayList FILE_INDEX_FIELD_TYPES = new ArrayList(); - - static { - FILE_INDEX_FIELD_NAMES.add(new ArrayList(Arrays.asList(""))); - FILE_INDEX_FIELD_TYPES.add(BuiltinType.ASTRING); - } - - public static boolean isIndexible(ExternalDatasetDetails ds) { - String adapter = ds.getAdapter(); - if (adapter.equalsIgnoreCase(ExternalDataConstants.ALIAS_HDFS_ADAPTER)) { - return true; - } - return false; - } - - public static boolean isRefereshActive(ExternalDatasetDetails ds) { - return ds.getState() != ExternalDatasetTransactionState.COMMIT; - } - - public static boolean isValidIndexName(String datasetName, String indexName) { - return (!datasetName.concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX).equals(indexName)); - } - - public static String getFilesIndexName(String datasetName) { - return datasetName.concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX); - } - - public static int getRIDSize(Dataset dataset) { - ExternalDatasetDetails dsd = ((ExternalDatasetDetails) dataset.getDatasetDetails()); - return IndexingConstants.getRIDSize(dsd.getProperties().get(IndexingConstants.KEY_INPUT_FORMAT)); - } - - public static IBinaryComparatorFactory[] getComparatorFactories(Dataset dataset) { - ExternalDatasetDetails dsd = ((ExternalDatasetDetails) dataset.getDatasetDetails()); - return IndexingConstants.getComparatorFactories((dsd.getProperties().get(IndexingConstants.KEY_INPUT_FORMAT))); - } - - public static IBinaryComparatorFactory[] getBuddyBtreeComparatorFactories() { - return IndexingConstants.getBuddyBtreeComparatorFactories(); - } - - public static ArrayList getSnapshotFromExternalFileSystem(Dataset dataset) - throws AlgebricksException { - ArrayList files = new ArrayList(); - ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails(); - try { - // Create the file system object - FileSystem fs = getFileSystemObject(datasetDetails.getProperties()); - // Get paths of dataset - String path = datasetDetails.getProperties().get(ExternalDataConstants.KEY_PATH); - String[] paths = path.split(","); - - // Add fileStatuses to files - for (String aPath : paths) { - FileStatus[] fileStatuses = fs.listStatus(new Path(aPath)); - for (int i = 0; i < fileStatuses.length; i++) { - int nextFileNumber = files.size(); - if (fileStatuses[i].isDirectory()) { - listSubFiles(dataset, fs, fileStatuses[i], files); - } else { - files.add(new ExternalFile(dataset.getDataverseName(), dataset.getDatasetName(), nextFileNumber, - fileStatuses[i].getPath().toUri().getPath(), - new Date(fileStatuses[i].getModificationTime()), fileStatuses[i].getLen(), - ExternalFilePendingOp.PENDING_NO_OP)); - } - } - } - // Close file system - fs.close(); - if (files.size() == 0) { - throw new AlgebricksException("File Snapshot retrieved from external file system is empty"); - } - return files; - } catch (Exception e) { - e.printStackTrace(); - throw new AlgebricksException("Unable to get list of HDFS files " + e); - } - } - - /* list all files under the directory - * src is expected to be a folder - */ - private static void listSubFiles(Dataset dataset, FileSystem srcFs, FileStatus src, ArrayList files) - throws IOException { - Path path = src.getPath(); - FileStatus[] fileStatuses = srcFs.listStatus(path); - for (int i = 0; i < fileStatuses.length; i++) { - int nextFileNumber = files.size(); - if (fileStatuses[i].isDirectory()) { - listSubFiles(dataset, srcFs, fileStatuses[i], files); - } else { - files.add(new ExternalFile(dataset.getDataverseName(), dataset.getDatasetName(), nextFileNumber, - fileStatuses[i].getPath().toUri().getPath(), new Date(fileStatuses[i].getModificationTime()), - fileStatuses[i].getLen(), ExternalFilePendingOp.PENDING_NO_OP)); - } - } - } - - public static FileSystem getFileSystemObject(Map map) throws IOException { - Configuration conf = new Configuration(); - conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_URI, map.get(ExternalDataConstants.KEY_HDFS_URL).trim()); - conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_CLASS, DistributedFileSystem.class.getName()); - return FileSystem.get(conf); - } - - public static JobSpecification buildFilesIndexReplicationJobSpec(Dataset dataset, - ArrayList externalFilesSnapshot, AqlMetadataProvider metadataProvider, boolean createIndex) - throws MetadataException, AlgebricksException { - JobSpecification spec = JobSpecificationUtils.createJobSpecification(); - IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance(); - AsterixStorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties(); - Pair> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset, - metadataProvider.getMetadataTxnContext()); - ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first; - Map mergePolicyFactoryProperties = compactionInfo.second; - Pair secondarySplitsAndConstraint = metadataProvider - .splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(), dataset.getDatasetName(), - getFilesIndexName(dataset.getDatasetName()), true); - IFileSplitProvider secondaryFileSplitProvider = secondarySplitsAndConstraint.first; - FilesIndexDescription filesIndexDescription = new FilesIndexDescription(); - ILocalResourceMetadata localResourceMetadata = new ExternalBTreeLocalResourceMetadata( - filesIndexDescription.EXTERNAL_FILE_INDEX_TYPE_TRAITS, filesIndexDescription.FILES_INDEX_COMP_FACTORIES, - new int[] { 0 }, false, dataset.getDatasetId(), mergePolicyFactory, mergePolicyFactoryProperties); - PersistentLocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider( - localResourceMetadata, LocalResource.ExternalBTreeResource); - ExternalBTreeDataflowHelperFactory indexDataflowHelperFactory = new ExternalBTreeDataflowHelperFactory( - mergePolicyFactory, mergePolicyFactoryProperties, - new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()), - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE, - storageProperties.getBloomFilterFalsePositiveRate(), - ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(dataset), true); - ExternalFilesIndexOperatorDescriptor externalFilesOp = new ExternalFilesIndexOperatorDescriptor(spec, - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, - secondaryFileSplitProvider, indexDataflowHelperFactory, localResourceFactoryProvider, - externalFilesSnapshot, createIndex); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, externalFilesOp, - secondarySplitsAndConstraint.second); - spec.addRoot(externalFilesOp); - spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); - return spec; - } - - /** - * This method create an indexing operator that index records in HDFS - * - * @param jobSpec - * @param itemType - * @param dataset - * @param files - * @param indexerDesc - * @return - * @throws Exception - */ - private static Pair getExternalDataIndexingOperator( - JobSpecification jobSpec, IAType itemType, Dataset dataset, List files, - RecordDescriptor indexerDesc, AqlMetadataProvider metadataProvider) throws Exception { - ExternalDatasetDetails externalDatasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails(); - Map configuration = externalDatasetDetails.getProperties(); - IAdapterFactory adapterFactory = AdapterFactoryProvider.getAdapterFactory(externalDatasetDetails.getAdapter(), - configuration, (ARecordType) itemType, files, true); - return new Pair( - new ExternalDataScanOperatorDescriptor(jobSpec, indexerDesc, adapterFactory), - adapterFactory.getPartitionConstraint()); - } - - public static Pair createExternalIndexingOp( - JobSpecification spec, AqlMetadataProvider metadataProvider, Dataset dataset, ARecordType itemType, - RecordDescriptor indexerDesc, List files) throws Exception { - if (files == null) { - files = MetadataManager.INSTANCE.getDatasetExternalFiles(metadataProvider.getMetadataTxnContext(), dataset); - } - return getExternalDataIndexingOperator(spec, itemType, dataset, files, indexerDesc, metadataProvider); - } - - /** - * At the end of this method, we expect to have 4 sets as follows: - * metadataFiles should contain only the files that are appended in their original state - * addedFiles should contain new files that has number assigned starting after the max original file number - * deletedFiles should contain files that are no longer there in the file system - * appendedFiles should have the new file information of existing files - * The method should return false in case of zero delta - * - * @param dataset - * @param metadataFiles - * @param addedFiles - * @param deletedFiles - * @param appendedFiles - * @return - * @throws MetadataException - * @throws AlgebricksException - */ - public static boolean isDatasetUptodate(Dataset dataset, List metadataFiles, - List addedFiles, List deletedFiles, List appendedFiles) - throws MetadataException, AlgebricksException { - boolean uptodate = true; - int newFileNumber = metadataFiles.get(metadataFiles.size() - 1).getFileNumber() + 1; - - ArrayList fileSystemFiles = getSnapshotFromExternalFileSystem(dataset); - - // Loop over file system files < taking care of added files > - for (ExternalFile fileSystemFile : fileSystemFiles) { - boolean fileFound = false; - Iterator mdFilesIterator = metadataFiles.iterator(); - while (mdFilesIterator.hasNext()) { - ExternalFile metadataFile = mdFilesIterator.next(); - if (fileSystemFile.getFileName().equals(metadataFile.getFileName())) { - // Same file name - if (fileSystemFile.getLastModefiedTime().equals(metadataFile.getLastModefiedTime())) { - // Same timestamp - if (fileSystemFile.getSize() == metadataFile.getSize()) { - // Same size -> no op - mdFilesIterator.remove(); - fileFound = true; - } else { - // Different size -> append op - metadataFile.setPendingOp(ExternalFilePendingOp.PENDING_APPEND_OP); - fileSystemFile.setPendingOp(ExternalFilePendingOp.PENDING_APPEND_OP); - appendedFiles.add(fileSystemFile); - fileFound = true; - uptodate = false; - } - } else { - // Same file name, Different file mod date -> delete and add - metadataFile.setPendingOp(ExternalFilePendingOp.PENDING_DROP_OP); - deletedFiles - .add(new ExternalFile(metadataFile.getDataverseName(), metadataFile.getDatasetName(), 0, - metadataFile.getFileName(), metadataFile.getLastModefiedTime(), - metadataFile.getSize(), ExternalFilePendingOp.PENDING_DROP_OP)); - fileSystemFile.setPendingOp(ExternalFilePendingOp.PENDING_ADD_OP); - fileSystemFile.setFileNumber(newFileNumber); - addedFiles.add(fileSystemFile); - newFileNumber++; - fileFound = true; - uptodate = false; - } - } - if (fileFound) { - break; - } - } - if (!fileFound) { - // File not stored previously in metadata -> pending add op - fileSystemFile.setPendingOp(ExternalFilePendingOp.PENDING_ADD_OP); - fileSystemFile.setFileNumber(newFileNumber); - addedFiles.add(fileSystemFile); - newFileNumber++; - uptodate = false; - } - } - - // Done with files from external file system -> metadata files now contain both deleted files and appended ones - // first, correct number assignment to deleted and updated files - for (ExternalFile deletedFile : deletedFiles) { - deletedFile.setFileNumber(newFileNumber); - newFileNumber++; - } - for (ExternalFile appendedFile : appendedFiles) { - appendedFile.setFileNumber(newFileNumber); - newFileNumber++; - } - - // include the remaining deleted files - Iterator mdFilesIterator = metadataFiles.iterator(); - while (mdFilesIterator.hasNext()) { - ExternalFile metadataFile = mdFilesIterator.next(); - if (metadataFile.getPendingOp() == ExternalFilePendingOp.PENDING_NO_OP) { - metadataFile.setPendingOp(ExternalFilePendingOp.PENDING_DROP_OP); - deletedFiles.add(new ExternalFile(metadataFile.getDataverseName(), metadataFile.getDatasetName(), - newFileNumber, metadataFile.getFileName(), metadataFile.getLastModefiedTime(), - metadataFile.getSize(), metadataFile.getPendingOp())); - newFileNumber++; - uptodate = false; - } - } - return uptodate; - } - - public static Dataset createTransactionDataset(Dataset dataset) { - ExternalDatasetDetails originalDsd = (ExternalDatasetDetails) dataset.getDatasetDetails(); - ExternalDatasetDetails dsd = new ExternalDatasetDetails(originalDsd.getAdapter(), originalDsd.getProperties(), - originalDsd.getTimestamp(), ExternalDatasetTransactionState.BEGIN); - Dataset transactionDatset = new Dataset(dataset.getDataverseName(), dataset.getDatasetName(), - dataset.getItemTypeDataverseName(), dataset.getItemTypeName(), dataset.getNodeGroupName(), - dataset.getCompactionPolicy(), dataset.getCompactionPolicyProperties(), dsd, dataset.getHints(), - DatasetType.EXTERNAL, dataset.getDatasetId(), dataset.getPendingOp()); - return transactionDatset; - } - - public static boolean isFileIndex(Index index) { - return (index.getIndexName().equals(getFilesIndexName(index.getDatasetName()))); - } - - public static JobSpecification buildDropFilesIndexJobSpec(CompiledIndexDropStatement indexDropStmt, - AqlMetadataProvider metadataProvider, Dataset dataset) throws AlgebricksException, MetadataException { - String dataverseName = indexDropStmt.getDataverseName() == null ? metadataProvider.getDefaultDataverseName() - : indexDropStmt.getDataverseName(); - String datasetName = indexDropStmt.getDatasetName(); - String indexName = indexDropStmt.getIndexName(); - boolean temp = dataset.getDatasetDetails().isTemp(); - JobSpecification spec = JobSpecificationUtils.createJobSpecification(); - Pair splitsAndConstraint = metadataProvider - .splitProviderAndPartitionConstraintsForFilesIndex(dataverseName, datasetName, indexName, true); - AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties(); - Pair> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset, - metadataProvider.getMetadataTxnContext()); - IndexDropOperatorDescriptor btreeDrop = new IndexDropOperatorDescriptor(spec, - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, - splitsAndConstraint.first, - new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), - compactionInfo.first, compactionInfo.second, - new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()), - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE, - storageProperties.getBloomFilterFalsePositiveRate(), false, null, null, null, null, !temp)); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeDrop, - splitsAndConstraint.second); - spec.addRoot(btreeDrop); - - return spec; - } - - public static JobSpecification buildFilesIndexUpdateOp(Dataset ds, List metadataFiles, - List deletedFiles, List addedFiles, List appendedFiles, - AqlMetadataProvider metadataProvider) throws MetadataException, AlgebricksException { - ArrayList files = new ArrayList(); - for (ExternalFile file : metadataFiles) { - if (file.getPendingOp() == ExternalFilePendingOp.PENDING_DROP_OP) { - files.add(file); - } else if (file.getPendingOp() == ExternalFilePendingOp.PENDING_APPEND_OP) { - for (ExternalFile appendedFile : appendedFiles) { - if (appendedFile.getFileName().equals(file.getFileName())) { - files.add(new ExternalFile(file.getDataverseName(), file.getDatasetName(), file.getFileNumber(), - file.getFileName(), file.getLastModefiedTime(), appendedFile.getSize(), - ExternalFilePendingOp.PENDING_NO_OP)); - } - } - } - } - for (ExternalFile file : addedFiles) { - files.add(file); - } - Collections.sort(files); - return buildFilesIndexReplicationJobSpec(ds, files, metadataProvider, false); - } - - public static JobSpecification buildIndexUpdateOp(Dataset ds, Index index, List metadataFiles, - List deletedFiles, List addedFiles, List appendedFiles, - AqlMetadataProvider metadataProvider) throws AsterixException, AlgebricksException { - // Create files list - ArrayList files = new ArrayList(); - - for (ExternalFile metadataFile : metadataFiles) { - if (metadataFile.getPendingOp() != ExternalFilePendingOp.PENDING_APPEND_OP) { - files.add(metadataFile); - } else { - metadataFile.setPendingOp(ExternalFilePendingOp.PENDING_NO_OP); - files.add(metadataFile); - } - } - // add new files - for (ExternalFile file : addedFiles) { - files.add(file); - } - // add appended files - for (ExternalFile file : appendedFiles) { - files.add(file); - } - - CompiledCreateIndexStatement ccis = new CompiledCreateIndexStatement(index.getIndexName(), - index.getDataverseName(), index.getDatasetName(), index.getKeyFieldNames(), index.getKeyFieldTypes(), - index.isEnforcingKeyFileds(), index.getGramLength(), index.getIndexType()); - return IndexOperations.buildSecondaryIndexLoadingJobSpec(ccis, null, null, metadataProvider, files); - } - - public static JobSpecification buildCommitJob(Dataset ds, List indexes, AqlMetadataProvider metadataProvider) - throws AlgebricksException, AsterixException { - JobSpecification spec = JobSpecificationUtils.createJobSpecification(); - IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance(); - AsterixStorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties(); - Pair> compactionInfo = DatasetUtils.getMergePolicyFactory(ds, - metadataProvider.getMetadataTxnContext()); - boolean temp = ds.getDatasetDetails().isTemp(); - ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first; - Map mergePolicyFactoryProperties = compactionInfo.second; - Pair filesIndexSplitsAndConstraint = metadataProvider - .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(), - getFilesIndexName(ds.getDatasetName()), temp); - IFileSplitProvider filesIndexSplitProvider = filesIndexSplitsAndConstraint.first; - ExternalBTreeDataflowHelperFactory filesIndexDataflowHelperFactory = getFilesIndexDataflowHelperFactory(ds, - mergePolicyFactory, mergePolicyFactoryProperties, storageProperties, spec); - IndexInfoOperatorDescriptor filesIndexInfo = new IndexInfoOperatorDescriptor(filesIndexSplitProvider, - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER); - - ArrayList btreeDataflowHelperFactories = new ArrayList(); - ArrayList btreeInfos = new ArrayList(); - ArrayList rtreeDataflowHelperFactories = new ArrayList(); - ArrayList rtreeInfos = new ArrayList(); - - for (Index index : indexes) { - if (isValidIndexName(index.getDatasetName(), index.getIndexName())) { - Pair indexSplitsAndConstraint = metadataProvider - .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(), - index.getIndexName(), temp); - if (index.getIndexType() == IndexType.BTREE) { - btreeDataflowHelperFactories.add(getBTreeDataflowHelperFactory(ds, index, mergePolicyFactory, - mergePolicyFactoryProperties, storageProperties, spec)); - btreeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first, - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER)); - } else if (index.getIndexType() == IndexType.RTREE) { - rtreeDataflowHelperFactories.add(getRTreeDataflowHelperFactory(ds, index, mergePolicyFactory, - mergePolicyFactoryProperties, storageProperties, metadataProvider, spec)); - rtreeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first, - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER)); - } - } - } - - ExternalDatasetIndexesCommitOperatorDescriptor op = new ExternalDatasetIndexesCommitOperatorDescriptor(spec, - filesIndexDataflowHelperFactory, filesIndexInfo, btreeDataflowHelperFactories, btreeInfos, - rtreeDataflowHelperFactories, rtreeInfos); - - spec.addRoot(op); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, op, - filesIndexSplitsAndConstraint.second); - spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); - return spec; - } - - private static ExternalBTreeDataflowHelperFactory getFilesIndexDataflowHelperFactory(Dataset ds, - ILSMMergePolicyFactory mergePolicyFactory, Map mergePolicyFactoryProperties, - AsterixStorageProperties storageProperties, JobSpecification spec) { - return new ExternalBTreeDataflowHelperFactory(mergePolicyFactory, mergePolicyFactoryProperties, - new SecondaryIndexOperationTrackerProvider(ds.getDatasetId()), - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE, - storageProperties.getBloomFilterFalsePositiveRate(), - ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(ds), true); - } - - private static ExternalBTreeWithBuddyDataflowHelperFactory getBTreeDataflowHelperFactory(Dataset ds, Index index, - ILSMMergePolicyFactory mergePolicyFactory, Map mergePolicyFactoryProperties, - AsterixStorageProperties storageProperties, JobSpecification spec) { - return new ExternalBTreeWithBuddyDataflowHelperFactory(mergePolicyFactory, mergePolicyFactoryProperties, - new SecondaryIndexOperationTrackerProvider(ds.getDatasetId()), - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE, - storageProperties.getBloomFilterFalsePositiveRate(), new int[] { index.getKeyFieldNames().size() }, - ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(ds), true); - } - - @SuppressWarnings("rawtypes") - private static ExternalRTreeDataflowHelperFactory getRTreeDataflowHelperFactory(Dataset ds, Index index, - ILSMMergePolicyFactory mergePolicyFactory, Map mergePolicyFactoryProperties, - AsterixStorageProperties storageProperties, AqlMetadataProvider metadataProvider, JobSpecification spec) - throws AlgebricksException, AsterixException { - int numPrimaryKeys = getRIDSize(ds); - List> secondaryKeyFields = index.getKeyFieldNames(); - secondaryKeyFields.size(); - ARecordType itemType = (ARecordType) metadataProvider.findType(ds.getItemTypeDataverseName(), - ds.getItemTypeName()); - Pair spatialTypePair = Index.getNonNullableKeyFieldType(secondaryKeyFields.get(0), itemType); - IAType spatialType = spatialTypePair.first; - if (spatialType == null) { - throw new AsterixException("Could not find field " + secondaryKeyFields.get(0) + " in the schema."); - } - int numDimensions = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag()); - int numNestedSecondaryKeyFields = numDimensions * 2; - IPrimitiveValueProviderFactory[] valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields]; - IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[numNestedSecondaryKeyFields]; - - ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys - + numNestedSecondaryKeyFields]; - ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numNestedSecondaryKeyFields + numPrimaryKeys]; - IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag()); - ATypeTag keyType = nestedKeyType.getTypeTag(); - - keyType = nestedKeyType.getTypeTag(); - for (int i = 0; i < numNestedSecondaryKeyFields; i++) { - ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE - .getSerializerDeserializer(nestedKeyType); - secondaryRecFields[i] = keySerde; - - secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE - .getBinaryComparatorFactory(nestedKeyType, true); - secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType); - valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE; - } - // Add serializers and comparators for primary index fields. - for (int i = 0; i < numPrimaryKeys; i++) { - secondaryRecFields[numNestedSecondaryKeyFields + i] = IndexingConstants.getSerializerDeserializer(i); - secondaryTypeTraits[numNestedSecondaryKeyFields + i] = IndexingConstants.getTypeTraits(i); - } - int[] primaryKeyFields = new int[numPrimaryKeys]; - for (int i = 0; i < primaryKeyFields.length; i++) { - primaryKeyFields[i] = i + numNestedSecondaryKeyFields; - } - - return new ExternalRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE, - getBuddyBtreeComparatorFactories(), mergePolicyFactory, mergePolicyFactoryProperties, - new SecondaryIndexOperationTrackerProvider(ds.getDatasetId()), - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMRTreeIOOperationCallbackFactory.INSTANCE, - AqlMetadataProvider.proposeLinearizer(keyType, secondaryComparatorFactories.length), - storageProperties.getBloomFilterFalsePositiveRate(), new int[] { index.getKeyFieldNames().size() }, - ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(ds), true); - } - - public static JobSpecification buildAbortOp(Dataset ds, List indexes, AqlMetadataProvider metadataProvider) - throws AlgebricksException, AsterixException { - JobSpecification spec = JobSpecificationUtils.createJobSpecification(); - IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance(); - AsterixStorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties(); - Pair> compactionInfo = DatasetUtils.getMergePolicyFactory(ds, - metadataProvider.getMetadataTxnContext()); - ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first; - Map mergePolicyFactoryProperties = compactionInfo.second; - - boolean temp = ds.getDatasetDetails().isTemp(); - Pair filesIndexSplitsAndConstraint = metadataProvider - .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(), - getFilesIndexName(ds.getDatasetName()), temp); - IFileSplitProvider filesIndexSplitProvider = filesIndexSplitsAndConstraint.first; - ExternalBTreeDataflowHelperFactory filesIndexDataflowHelperFactory = getFilesIndexDataflowHelperFactory(ds, - mergePolicyFactory, mergePolicyFactoryProperties, storageProperties, spec); - IndexInfoOperatorDescriptor filesIndexInfo = new IndexInfoOperatorDescriptor(filesIndexSplitProvider, - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER); - - ArrayList btreeDataflowHelperFactories = new ArrayList(); - ArrayList btreeInfos = new ArrayList(); - ArrayList rtreeDataflowHelperFactories = new ArrayList(); - ArrayList rtreeInfos = new ArrayList(); - - for (Index index : indexes) { - if (isValidIndexName(index.getDatasetName(), index.getIndexName())) { - Pair indexSplitsAndConstraint = metadataProvider - .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(), - index.getIndexName(), temp); - if (index.getIndexType() == IndexType.BTREE) { - btreeDataflowHelperFactories.add(getBTreeDataflowHelperFactory(ds, index, mergePolicyFactory, - mergePolicyFactoryProperties, storageProperties, spec)); - btreeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first, - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER)); - } else if (index.getIndexType() == IndexType.RTREE) { - rtreeDataflowHelperFactories.add(getRTreeDataflowHelperFactory(ds, index, mergePolicyFactory, - mergePolicyFactoryProperties, storageProperties, metadataProvider, spec)); - rtreeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first, - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER)); - } - } - } - - ExternalDatasetIndexesAbortOperatorDescriptor op = new ExternalDatasetIndexesAbortOperatorDescriptor(spec, - filesIndexDataflowHelperFactory, filesIndexInfo, btreeDataflowHelperFactories, btreeInfos, - rtreeDataflowHelperFactories, rtreeInfos); - - spec.addRoot(op); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, op, - filesIndexSplitsAndConstraint.second); - spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); - return spec; - - } - - public static JobSpecification buildRecoverOp(Dataset ds, List indexes, AqlMetadataProvider metadataProvider) - throws AlgebricksException, AsterixException { - JobSpecification spec = JobSpecificationUtils.createJobSpecification(); - IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance(); - AsterixStorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties(); - Pair> compactionInfo = DatasetUtils.getMergePolicyFactory(ds, - metadataProvider.getMetadataTxnContext()); - ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first; - Map mergePolicyFactoryProperties = compactionInfo.second; - boolean temp = ds.getDatasetDetails().isTemp(); - - Pair filesIndexSplitsAndConstraint = metadataProvider - .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(), - getFilesIndexName(ds.getDatasetName()), temp); - IFileSplitProvider filesIndexSplitProvider = filesIndexSplitsAndConstraint.first; - ExternalBTreeDataflowHelperFactory filesIndexDataflowHelperFactory = getFilesIndexDataflowHelperFactory(ds, - mergePolicyFactory, mergePolicyFactoryProperties, storageProperties, spec); - IndexInfoOperatorDescriptor filesIndexInfo = new IndexInfoOperatorDescriptor(filesIndexSplitProvider, - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER); - - ArrayList btreeDataflowHelperFactories = new ArrayList(); - ArrayList btreeInfos = new ArrayList(); - ArrayList rtreeDataflowHelperFactories = new ArrayList(); - ArrayList rtreeInfos = new ArrayList(); - - for (Index index : indexes) { - if (isValidIndexName(index.getDatasetName(), index.getIndexName())) { - Pair indexSplitsAndConstraint = metadataProvider - .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(), - index.getIndexName(), temp); - if (index.getIndexType() == IndexType.BTREE) { - btreeDataflowHelperFactories.add(getBTreeDataflowHelperFactory(ds, index, mergePolicyFactory, - mergePolicyFactoryProperties, storageProperties, spec)); - btreeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first, - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER)); - } else if (index.getIndexType() == IndexType.RTREE) { - rtreeDataflowHelperFactories.add(getRTreeDataflowHelperFactory(ds, index, mergePolicyFactory, - mergePolicyFactoryProperties, storageProperties, metadataProvider, spec)); - rtreeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first, - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER)); - } - } - } - - ExternalDatasetIndexesRecoverOperatorDescriptor op = new ExternalDatasetIndexesRecoverOperatorDescriptor(spec, - filesIndexDataflowHelperFactory, filesIndexInfo, btreeDataflowHelperFactories, btreeInfos, - rtreeDataflowHelperFactories, rtreeInfos); - - spec.addRoot(op); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, op, - filesIndexSplitsAndConstraint.second); - spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); - return spec; - } - - public static JobSpecification compactFilesIndexJobSpec(Dataset dataset, AqlMetadataProvider metadataProvider) - throws MetadataException, AlgebricksException { - JobSpecification spec = JobSpecificationUtils.createJobSpecification(); - IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance(); - AsterixStorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties(); - Pair> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset, - metadataProvider.getMetadataTxnContext()); - ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first; - Map mergePolicyFactoryProperties = compactionInfo.second; - Pair secondarySplitsAndConstraint = metadataProvider - .splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(), dataset.getDatasetName(), - getFilesIndexName(dataset.getDatasetName()), true); - IFileSplitProvider secondaryFileSplitProvider = secondarySplitsAndConstraint.first; - ExternalBTreeDataflowHelperFactory indexDataflowHelperFactory = new ExternalBTreeDataflowHelperFactory( - mergePolicyFactory, mergePolicyFactoryProperties, - new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()), - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE, - storageProperties.getBloomFilterFalsePositiveRate(), - ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(dataset), true); - FilesIndexDescription filesIndexDescription = new FilesIndexDescription(); - LSMTreeIndexCompactOperatorDescriptor compactOp = new LSMTreeIndexCompactOperatorDescriptor(spec, - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, - secondaryFileSplitProvider, filesIndexDescription.EXTERNAL_FILE_INDEX_TYPE_TRAITS, - filesIndexDescription.FILES_INDEX_COMP_FACTORIES, new int[] { 0 }, indexDataflowHelperFactory, - NoOpOperationCallbackFactory.INSTANCE); - spec.addRoot(compactOp); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp, - secondarySplitsAndConstraint.second); - spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); - return spec; - } -}