asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [23/34] incubator-asterixdb git commit: Enabled Feed Tests and Added External Library tests
Date Mon, 22 Feb 2016 22:35:11 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-app/src/main/java/org/apache/asterix/feed/FeedTrackingManager.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feed/FeedTrackingManager.java b/asterix-app/src/main/java/org/apache/asterix/feed/FeedTrackingManager.java
deleted file mode 100644
index a1c6fb9..0000000
--- a/asterix-app/src/main/java/org/apache/asterix/feed/FeedTrackingManager.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.feed;
-
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.external.feed.api.IFeedTrackingManager;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.message.FeedTupleCommitAckMessage;
-import org.apache.asterix.external.feed.message.FeedTupleCommitResponseMessage;
-import org.apache.asterix.file.FeedOperations;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class FeedTrackingManager implements IFeedTrackingManager {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedTrackingManager.class.getName());
-
-    private final BitSet allOnes;
-
-    private Map<FeedConnectionId, Map<AckId, BitSet>> ackHistory;
-    private Map<FeedConnectionId, Map<AckId, Integer>> maxBaseAcked;
-
-    public FeedTrackingManager() {
-        byte[] allOneBytes = new byte[128];
-        Arrays.fill(allOneBytes, (byte) 0xff);
-        allOnes = BitSet.valueOf(allOneBytes);
-        ackHistory = new HashMap<FeedConnectionId, Map<AckId, BitSet>>();
-        maxBaseAcked = new HashMap<FeedConnectionId, Map<AckId, Integer>>();
-    }
-
-    @Override
-    public synchronized void submitAckReport(FeedTupleCommitAckMessage ackMessage) {
-        AckId ackId = getAckId(ackMessage);
-        Map<AckId, BitSet> acksForConnection = ackHistory.get(ackMessage.getConnectionId());
-        if (acksForConnection == null) {
-            acksForConnection = new HashMap<AckId, BitSet>();
-            acksForConnection.put(ackId, BitSet.valueOf(ackMessage.getCommitAcks()));
-            ackHistory.put(ackMessage.getConnectionId(), acksForConnection);
-        }
-        BitSet currentAcks = acksForConnection.get(ackId);
-        if (currentAcks == null) {
-            currentAcks = BitSet.valueOf(ackMessage.getCommitAcks());
-            acksForConnection.put(ackId, currentAcks);
-        } else {
-            currentAcks.or(BitSet.valueOf(ackMessage.getCommitAcks()));
-        }
-        if (Arrays.equals(currentAcks.toByteArray(), allOnes.toByteArray())) {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info(ackMessage.getIntakePartition() + " (" + ackMessage.getBase() + ")" + " is convered");
-            }
-            Map<AckId, Integer> maxBaseAckedForConnection = maxBaseAcked.get(ackMessage.getConnectionId());
-            if (maxBaseAckedForConnection == null) {
-                maxBaseAckedForConnection = new HashMap<AckId, Integer>();
-                maxBaseAcked.put(ackMessage.getConnectionId(), maxBaseAckedForConnection);
-            }
-            Integer maxBaseAckedValue = maxBaseAckedForConnection.get(ackId);
-            if (maxBaseAckedValue == null) {
-                maxBaseAckedValue = ackMessage.getBase();
-                maxBaseAckedForConnection.put(ackId, ackMessage.getBase());
-                sendCommitResponseMessage(ackMessage.getConnectionId(), ackMessage.getIntakePartition(),
-                        ackMessage.getBase());
-            } else if (ackMessage.getBase() == maxBaseAckedValue + 1) {
-                maxBaseAckedForConnection.put(ackId, ackMessage.getBase());
-                sendCommitResponseMessage(ackMessage.getConnectionId(), ackMessage.getIntakePartition(),
-                        ackMessage.getBase());
-            } else {
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Ignoring discountiuous acked base " + ackMessage.getBase() + " for " + ackId);
-                }
-            }
-
-        } else {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("AckId " + ackId + " pending number of acks " + (128 * 8 - currentAcks.cardinality()));
-            }
-        }
-    }
-
-    public synchronized void disableTracking(FeedConnectionId connectionId) {
-        ackHistory.remove(connectionId);
-        maxBaseAcked.remove(connectionId);
-    }
-
-    private void sendCommitResponseMessage(FeedConnectionId connectionId, int partition, int base) {
-        FeedTupleCommitResponseMessage response = new FeedTupleCommitResponseMessage(connectionId, partition, base);
-        List<String> storageLocations = FeedLifecycleListener.INSTANCE.getStoreLocations(connectionId);
-        List<String> collectLocations = FeedLifecycleListener.INSTANCE.getCollectLocations(connectionId);
-        String collectLocation = collectLocations.get(partition);
-        Set<String> messageDestinations = new HashSet<String>();
-        messageDestinations.add(collectLocation);
-        messageDestinations.addAll(storageLocations);
-        try {
-            JobSpecification spec = FeedOperations.buildCommitAckResponseJob(response, messageDestinations);
-            CentralFeedManager.runJob(spec, false);
-        } catch (Exception e) {
-            e.printStackTrace();
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Unable to send commit response message " + response + " exception " + e.getMessage());
-            }
-        }
-    }
-
-    private static AckId getAckId(FeedTupleCommitAckMessage ackMessage) {
-        return new AckId(ackMessage.getConnectionId(), ackMessage.getIntakePartition(), ackMessage.getBase());
-    }
-
-    private static class AckId {
-        private FeedConnectionId connectionId;
-        private int intakePartition;
-        private int base;
-
-        public AckId(FeedConnectionId connectionId, int intakePartition, int base) {
-            this.connectionId = connectionId;
-            this.intakePartition = intakePartition;
-            this.base = base;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (!(o instanceof AckId)) {
-                return false;
-            }
-            AckId other = (AckId) o;
-            return other.getConnectionId().equals(connectionId) && other.getIntakePartition() == intakePartition
-                    && other.getBase() == base;
-        }
-
-        @Override
-        public String toString() {
-            return connectionId + "[" + intakePartition + "]" + "(" + base + ")";
-        }
-
-        @Override
-        public int hashCode() {
-            return toString().hashCode();
-        }
-
-        public FeedConnectionId getConnectionId() {
-            return connectionId;
-        }
-
-        public int getIntakePartition() {
-            return intakePartition;
-        }
-
-        public int getBase() {
-            return base;
-        }
-
-    }
-
-    @Override
-    public void disableAcking(FeedConnectionId connectionId) {
-        ackHistory.remove(connectionId);
-        maxBaseAcked.remove(connectionId);
-        if (LOGGER.isLoggable(Level.WARNING)) {
-            LOGGER.warning("Acking disabled for " + connectionId);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-app/src/main/java/org/apache/asterix/feed/FeedWorkCollection.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feed/FeedWorkCollection.java b/asterix-app/src/main/java/org/apache/asterix/feed/FeedWorkCollection.java
deleted file mode 100644
index 9d746c8..0000000
--- a/asterix-app/src/main/java/org/apache/asterix/feed/FeedWorkCollection.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.feed;
-
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.api.common.SessionConfig;
-import org.apache.asterix.api.common.SessionConfig.OutputFormat;
-import org.apache.asterix.aql.translator.QueryTranslator;
-import org.apache.asterix.compiler.provider.AqlCompilationProvider;
-import org.apache.asterix.compiler.provider.ILangCompilationProvider;
-import org.apache.asterix.external.feed.api.IFeedWork;
-import org.apache.asterix.external.feed.api.IFeedWorkEventListener;
-import org.apache.asterix.external.feed.management.FeedCollectInfo;
-import org.apache.asterix.external.feed.management.FeedConnectionRequest;
-import org.apache.asterix.external.feed.management.FeedConnectionRequest.ConnectionStatus;
-import org.apache.asterix.lang.aql.statement.SubscribeFeedStatement;
-import org.apache.asterix.lang.common.base.Statement;
-import org.apache.asterix.lang.common.statement.DataverseDecl;
-import org.apache.asterix.lang.common.struct.Identifier;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.hyracks.api.job.JobId;
-
-/**
- * A collection of feed management related task, each represented as an implementation of {@code IFeedWork}.
- */
-public class FeedWorkCollection {
-
-    private static Logger LOGGER = Logger.getLogger(FeedWorkCollection.class.getName());
-    private static final ILangCompilationProvider compilationProvider = new AqlCompilationProvider();
-
-    /**
-     * The task of subscribing to a feed to obtain data.
-     */
-    public static class SubscribeFeedWork implements IFeedWork {
-
-        private final Runnable runnable;
-
-        private final FeedConnectionRequest request;
-
-        @Override
-        public Runnable getRunnable() {
-            return runnable;
-        }
-
-        public SubscribeFeedWork(String[] locations, FeedConnectionRequest request) {
-            this.runnable = new SubscribeFeedWorkRunnable(locations, request);
-            this.request = request;
-        }
-
-        private static class SubscribeFeedWorkRunnable implements Runnable {
-
-            private final FeedConnectionRequest request;
-            private final String[] locations;
-
-            public SubscribeFeedWorkRunnable(String[] locations, FeedConnectionRequest request) {
-                this.request = request;
-                this.locations = locations;
-            }
-
-            @Override
-            public void run() {
-                try {
-                    PrintWriter writer = new PrintWriter(System.out, true);
-                    SessionConfig pc = new SessionConfig(writer, OutputFormat.ADM);
-                    DataverseDecl dataverseDecl = new DataverseDecl(
-                            new Identifier(request.getReceivingFeedId().getDataverse()));
-                    SubscribeFeedStatement subscribeStmt = new SubscribeFeedStatement(locations, request);
-                    List<Statement> statements = new ArrayList<Statement>();
-                    statements.add(dataverseDecl);
-                    statements.add(subscribeStmt);
-                    QueryTranslator translator = new QueryTranslator(statements, pc, compilationProvider);
-                    translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null,
-                            QueryTranslator.ResultDelivery.SYNC);
-                    if (LOGGER.isLoggable(Level.INFO)) {
-                        LOGGER.info("Submitted connection requests for execution: " + request);
-                    }
-                } catch (Exception e) {
-                    if (LOGGER.isLoggable(Level.SEVERE)) {
-                        LOGGER.severe("Exception in executing " + request);
-                    }
-                    throw new RuntimeException(e);
-                }
-            }
-        }
-
-        public static class FeedSubscribeWorkEventListener implements IFeedWorkEventListener {
-
-            @Override
-            public void workFailed(IFeedWork work, Exception e) {
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning(" Feed subscription request " + ((SubscribeFeedWork) work).request
-                            + " failed with exception " + e);
-                }
-            }
-
-            @Override
-            public void workCompleted(IFeedWork work) {
-                ((SubscribeFeedWork) work).request.setSubscriptionStatus(ConnectionStatus.ACTIVE);
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.warning(" Feed subscription request " + ((SubscribeFeedWork) work).request + " completed ");
-                }
-            }
-
-        }
-
-        public FeedConnectionRequest getRequest() {
-            return request;
-        }
-
-        @Override
-        public String toString() {
-            return "SubscribeFeedWork for [" + request + "]";
-        }
-
-    }
-
-    /**
-     * The task of activating a set of feeds.
-     */
-    public static class ActivateFeedWork implements IFeedWork {
-
-        private final Runnable runnable;
-
-        @Override
-        public Runnable getRunnable() {
-            return runnable;
-        }
-
-        public ActivateFeedWork(List<FeedCollectInfo> feedsToRevive) {
-            this.runnable = new FeedsActivateRunnable(feedsToRevive);
-        }
-
-        public ActivateFeedWork() {
-            this.runnable = new FeedsActivateRunnable();
-        }
-
-        private static class FeedsActivateRunnable implements Runnable {
-
-            private List<FeedCollectInfo> feedsToRevive;
-            private Mode mode;
-
-            public enum Mode {
-                REVIVAL_POST_NODE_REJOIN
-            }
-
-            public FeedsActivateRunnable(List<FeedCollectInfo> feedsToRevive) {
-                this.feedsToRevive = feedsToRevive;
-            }
-
-            public FeedsActivateRunnable() {
-            }
-
-            @Override
-            public void run() {
-                switch (mode) {
-                    case REVIVAL_POST_NODE_REJOIN:
-                        try {
-                            Thread.sleep(10000);
-                        } catch (InterruptedException e1) {
-                            if (LOGGER.isLoggable(Level.INFO)) {
-                                LOGGER.info("Attempt to resume feed interrupted");
-                            }
-                            throw new IllegalStateException(e1.getMessage());
-                        }
-                        for (FeedCollectInfo finfo : feedsToRevive) {
-                            try {
-                                JobId jobId = AsterixAppContextInfo.getInstance().getHcc().startJob(finfo.jobSpec);
-                                if (LOGGER.isLoggable(Level.INFO)) {
-                                    LOGGER.info("Resumed feed :" + finfo.feedConnectionId + " job id " + jobId);
-                                    LOGGER.info("Job:" + finfo.jobSpec);
-                                }
-                            } catch (Exception e) {
-                                if (LOGGER.isLoggable(Level.WARNING)) {
-                                    LOGGER.warning(
-                                            "Unable to resume feed " + finfo.feedConnectionId + " " + e.getMessage());
-                                }
-                            }
-                        }
-                }
-            }
-
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-app/src/main/java/org/apache/asterix/feed/FeedWorkRequestResponseHandler.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feed/FeedWorkRequestResponseHandler.java b/asterix-app/src/main/java/org/apache/asterix/feed/FeedWorkRequestResponseHandler.java
deleted file mode 100644
index b30d8a7..0000000
--- a/asterix-app/src/main/java/org/apache/asterix/feed/FeedWorkRequestResponseHandler.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.feed;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.api.IClusterManagementWork;
-import org.apache.asterix.common.api.IClusterManagementWorkResponse;
-import org.apache.asterix.external.feed.watch.FeedConnectJobInfo;
-import org.apache.asterix.external.feed.watch.FeedIntakeInfo;
-import org.apache.asterix.external.feed.watch.FeedJobInfo;
-import org.apache.asterix.metadata.cluster.AddNodeWork;
-import org.apache.asterix.metadata.cluster.AddNodeWorkResponse;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.om.util.AsterixClusterProperties;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.constraints.Constraint;
-import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
-import org.apache.hyracks.api.constraints.expressions.ConstantExpression;
-import org.apache.hyracks.api.constraints.expressions.ConstraintExpression;
-import org.apache.hyracks.api.constraints.expressions.ConstraintExpression.ExpressionTag;
-import org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression;
-import org.apache.hyracks.api.constraints.expressions.PartitionCountExpression;
-import org.apache.hyracks.api.constraints.expressions.PartitionLocationExpression;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class FeedWorkRequestResponseHandler implements Runnable {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedWorkRequestResponseHandler.class.getName());
-
-    private final LinkedBlockingQueue<IClusterManagementWorkResponse> inbox;
-
-    private Map<Integer, Map<String, List<FeedJobInfo>>> feedsWaitingForResponse = new HashMap<Integer, Map<String, List<FeedJobInfo>>>();
-
-    public FeedWorkRequestResponseHandler(LinkedBlockingQueue<IClusterManagementWorkResponse> inbox) {
-        this.inbox = inbox;
-    }
-
-    @Override
-    public void run() {
-        while (true) {
-            IClusterManagementWorkResponse response = null;
-            try {
-                response = inbox.take();
-            } catch (InterruptedException e) {
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("Interrupted exception " + e.getMessage());
-                }
-            }
-            IClusterManagementWork submittedWork = response.getWork();
-            Map<String, String> nodeSubstitution = new HashMap<String, String>();
-            switch (submittedWork.getClusterManagementWorkType()) {
-                case ADD_NODE:
-                    AddNodeWork addNodeWork = (AddNodeWork) submittedWork;
-                    int workId = addNodeWork.getWorkId();
-                    Map<String, List<FeedJobInfo>> failureAnalysis = feedsWaitingForResponse.get(workId);
-                    AddNodeWorkResponse resp = (AddNodeWorkResponse) response;
-                    List<String> nodesAdded = resp.getNodesAdded();
-                    List<String> unsubstitutedNodes = new ArrayList<String>();
-                    unsubstitutedNodes.addAll(addNodeWork.getDeadNodes());
-                    int nodeIndex = 0;
-
-                    /** form a mapping between the failed node and its substitute **/
-                    if (nodesAdded != null && nodesAdded.size() > 0) {
-                        for (String failedNodeId : addNodeWork.getDeadNodes()) {
-                            String substitute = nodesAdded.get(nodeIndex);
-                            nodeSubstitution.put(failedNodeId, substitute);
-                            nodeIndex = (nodeIndex + 1) % nodesAdded.size();
-                            unsubstitutedNodes.remove(failedNodeId);
-                            if (LOGGER.isLoggable(Level.INFO)) {
-                                LOGGER.info("Node " + substitute + " chosen to substiute lost node " + failedNodeId);
-                            }
-                        }
-                    }
-                    if (unsubstitutedNodes.size() > 0) {
-                        String[] participantNodes = AsterixClusterProperties.INSTANCE.getParticipantNodes()
-                                .toArray(new String[] {});
-                        nodeIndex = 0;
-                        for (String unsubstitutedNode : unsubstitutedNodes) {
-                            nodeSubstitution.put(unsubstitutedNode, participantNodes[nodeIndex]);
-                            if (LOGGER.isLoggable(Level.INFO)) {
-                                LOGGER.info("Node " + participantNodes[nodeIndex] + " chosen to substiute lost node "
-                                        + unsubstitutedNode);
-                            }
-                            nodeIndex = (nodeIndex + 1) % participantNodes.length;
-                        }
-
-                        if (LOGGER.isLoggable(Level.WARNING)) {
-                            LOGGER.warning("Request " + resp.getWork() + " completed using internal nodes");
-                        }
-                    }
-
-                    // alter failed feed intake jobs
-
-                    for (Entry<String, List<FeedJobInfo>> entry : failureAnalysis.entrySet()) {
-                        String failedNode = entry.getKey();
-                        List<FeedJobInfo> impactedJobInfos = entry.getValue();
-                        for (FeedJobInfo info : impactedJobInfos) {
-                            JobSpecification spec = info.getSpec();
-                            replaceNode(spec, failedNode, nodeSubstitution.get(failedNode));
-                            info.setSpec(spec);
-                        }
-                    }
-
-                    Set<FeedIntakeInfo> revisedIntakeJobs = new HashSet<FeedIntakeInfo>();
-                    Set<FeedConnectJobInfo> revisedConnectJobInfos = new HashSet<FeedConnectJobInfo>();
-
-                    for (List<FeedJobInfo> infos : failureAnalysis.values()) {
-                        for (FeedJobInfo info : infos) {
-                            switch (info.getJobType()) {
-                                case INTAKE:
-                                    revisedIntakeJobs.add((FeedIntakeInfo) info);
-                                    break;
-                                case FEED_CONNECT:
-                                    revisedConnectJobInfos.add((FeedConnectJobInfo) info);
-                                    break;
-                            }
-                        }
-                    }
-
-                    IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
-                    try {
-                        for (FeedIntakeInfo info : revisedIntakeJobs) {
-                            hcc.startJob(info.getSpec());
-                        }
-                        Thread.sleep(2000);
-                        for (FeedConnectJobInfo info : revisedConnectJobInfos) {
-                            hcc.startJob(info.getSpec());
-                            Thread.sleep(2000);
-                        }
-                    } catch (Exception e) {
-                        if (LOGGER.isLoggable(Level.WARNING)) {
-                            LOGGER.warning("Unable to start revised job post failure");
-                        }
-                    }
-
-                    break;
-                case REMOVE_NODE:
-                    throw new IllegalStateException("Invalid work submitted");
-            }
-        }
-    }
-
-    private void replaceNode(JobSpecification jobSpec, String failedNodeId, String replacementNode) {
-        Set<Constraint> userConstraints = jobSpec.getUserConstraints();
-        List<Constraint> locationConstraintsToReplace = new ArrayList<Constraint>();
-        List<Constraint> countConstraintsToReplace = new ArrayList<Constraint>();
-        List<OperatorDescriptorId> modifiedOperators = new ArrayList<OperatorDescriptorId>();
-        Map<OperatorDescriptorId, List<Constraint>> candidateConstraints = new HashMap<OperatorDescriptorId, List<Constraint>>();
-        Map<OperatorDescriptorId, Map<Integer, String>> newConstraints = new HashMap<OperatorDescriptorId, Map<Integer, String>>();
-        OperatorDescriptorId opId = null;
-        for (Constraint constraint : userConstraints) {
-            LValueConstraintExpression lexpr = constraint.getLValue();
-            ConstraintExpression cexpr = constraint.getRValue();
-            switch (lexpr.getTag()) {
-                case PARTITION_COUNT:
-                    opId = ((PartitionCountExpression) lexpr).getOperatorDescriptorId();
-                    if (modifiedOperators.contains(opId)) {
-                        countConstraintsToReplace.add(constraint);
-                    } else {
-                        List<Constraint> clist = candidateConstraints.get(opId);
-                        if (clist == null) {
-                            clist = new ArrayList<Constraint>();
-                            candidateConstraints.put(opId, clist);
-                        }
-                        clist.add(constraint);
-                    }
-                    break;
-                case PARTITION_LOCATION:
-                    opId = ((PartitionLocationExpression) lexpr).getOperatorDescriptorId();
-                    String oldLocation = (String) ((ConstantExpression) cexpr).getValue();
-                    if (oldLocation.equals(failedNodeId)) {
-                        locationConstraintsToReplace.add(constraint);
-                        modifiedOperators.add(((PartitionLocationExpression) lexpr).getOperatorDescriptorId());
-                        Map<Integer, String> newLocs = newConstraints.get(opId);
-                        if (newLocs == null) {
-                            newLocs = new HashMap<Integer, String>();
-                            newConstraints.put(opId, newLocs);
-                        }
-                        int partition = ((PartitionLocationExpression) lexpr).getPartition();
-                        newLocs.put(partition, replacementNode);
-                    } else {
-                        if (modifiedOperators.contains(opId)) {
-                            locationConstraintsToReplace.add(constraint);
-                            Map<Integer, String> newLocs = newConstraints.get(opId);
-                            if (newLocs == null) {
-                                newLocs = new HashMap<Integer, String>();
-                                newConstraints.put(opId, newLocs);
-                            }
-                            int partition = ((PartitionLocationExpression) lexpr).getPartition();
-                            newLocs.put(partition, oldLocation);
-                        } else {
-                            List<Constraint> clist = candidateConstraints.get(opId);
-                            if (clist == null) {
-                                clist = new ArrayList<Constraint>();
-                                candidateConstraints.put(opId, clist);
-                            }
-                            clist.add(constraint);
-                        }
-                    }
-                    break;
-                default:
-                    break;
-            }
-        }
-
-        jobSpec.getUserConstraints().removeAll(locationConstraintsToReplace);
-        jobSpec.getUserConstraints().removeAll(countConstraintsToReplace);
-
-        for (OperatorDescriptorId mopId : modifiedOperators) {
-            List<Constraint> clist = candidateConstraints.get(mopId);
-            if (clist != null && !clist.isEmpty()) {
-                jobSpec.getUserConstraints().removeAll(clist);
-
-                for (Constraint c : clist) {
-                    if (c.getLValue().getTag().equals(ExpressionTag.PARTITION_LOCATION)) {
-                        ConstraintExpression cexpr = c.getRValue();
-                        int partition = ((PartitionLocationExpression) c.getLValue()).getPartition();
-                        String oldLocation = (String) ((ConstantExpression) cexpr).getValue();
-                        newConstraints.get(mopId).put(partition, oldLocation);
-                    }
-                }
-            }
-        }
-
-        for (Entry<OperatorDescriptorId, Map<Integer, String>> entry : newConstraints.entrySet()) {
-            OperatorDescriptorId nopId = entry.getKey();
-            Map<Integer, String> clist = entry.getValue();
-            IOperatorDescriptor op = jobSpec.getOperatorMap().get(nopId);
-            String[] locations = new String[clist.size()];
-            for (int i = 0; i < locations.length; i++) {
-                locations[i] = clist.get(i);
-            }
-            PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, op, locations);
-        }
-
-    }
-
-    public void registerFeedWork(int workId, Map<String, List<FeedJobInfo>> impactedJobs) {
-        feedsWaitingForResponse.put(workId, impactedJobs);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-app/src/main/java/org/apache/asterix/feed/FeedsActivator.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feed/FeedsActivator.java b/asterix-app/src/main/java/org/apache/asterix/feed/FeedsActivator.java
deleted file mode 100644
index dc02a53..0000000
--- a/asterix-app/src/main/java/org/apache/asterix/feed/FeedsActivator.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.feed;
-
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.api.common.SessionConfig;
-import org.apache.asterix.api.common.SessionConfig.OutputFormat;
-import org.apache.asterix.aql.translator.QueryTranslator;
-import org.apache.asterix.compiler.provider.AqlCompilationProvider;
-import org.apache.asterix.compiler.provider.ILangCompilationProvider;
-import org.apache.asterix.external.feed.management.FeedCollectInfo;
-import org.apache.asterix.lang.common.base.Statement;
-import org.apache.asterix.lang.common.statement.ConnectFeedStatement;
-import org.apache.asterix.lang.common.statement.DataverseDecl;
-import org.apache.asterix.lang.common.struct.Identifier;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.hyracks.api.job.JobId;
-
-public class FeedsActivator implements Runnable {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedJobNotificationHandler.class.getName());
-    private static final ILangCompilationProvider compilationProvider = new AqlCompilationProvider();
-
-    private List<FeedCollectInfo> feedsToRevive;
-    private Mode mode;
-
-    public enum Mode {
-        REVIVAL_POST_CLUSTER_REBOOT,
-        REVIVAL_POST_NODE_REJOIN
-    }
-
-    public FeedsActivator() {
-        this.mode = Mode.REVIVAL_POST_CLUSTER_REBOOT;
-    }
-
-    public FeedsActivator(List<FeedCollectInfo> feedsToRevive) {
-        this.feedsToRevive = feedsToRevive;
-        this.mode = Mode.REVIVAL_POST_NODE_REJOIN;
-    }
-
-    @Override
-    public void run() {
-        switch (mode) {
-            case REVIVAL_POST_CLUSTER_REBOOT:
-                //revivePostClusterReboot();
-                break;
-            case REVIVAL_POST_NODE_REJOIN:
-                try {
-                    Thread.sleep(10000);
-                } catch (InterruptedException e1) {
-                    if (LOGGER.isLoggable(Level.INFO)) {
-                        LOGGER.info("Attempt to resume feed interrupted");
-                    }
-                    throw new IllegalStateException(e1.getMessage());
-                }
-                for (FeedCollectInfo finfo : feedsToRevive) {
-                    try {
-                        JobId jobId = AsterixAppContextInfo.getInstance().getHcc().startJob(finfo.jobSpec);
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("Resumed feed :" + finfo.feedConnectionId + " job id " + jobId);
-                            LOGGER.info("Job:" + finfo.jobSpec);
-                        }
-                    } catch (Exception e) {
-                        if (LOGGER.isLoggable(Level.WARNING)) {
-                            LOGGER.warning("Unable to resume feed " + finfo.feedConnectionId + " " + e.getMessage());
-                        }
-                    }
-                }
-        }
-    }
-
-    public void reviveFeed(String dataverse, String feedName, String dataset, String feedPolicy) {
-        PrintWriter writer = new PrintWriter(System.out, true);
-        SessionConfig pc = new SessionConfig(writer, OutputFormat.ADM);
-        try {
-            DataverseDecl dataverseDecl = new DataverseDecl(new Identifier(dataverse));
-            ConnectFeedStatement stmt = new ConnectFeedStatement(new Identifier(dataverse), new Identifier(feedName),
-                    new Identifier(dataset), feedPolicy, 0);
-            stmt.setForceConnect(true);
-            List<Statement> statements = new ArrayList<Statement>();
-            statements.add(dataverseDecl);
-            statements.add(stmt);
-            QueryTranslator translator = new QueryTranslator(statements, pc, compilationProvider);
-            translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null,
-                    QueryTranslator.ResultDelivery.SYNC);
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Resumed feed: " + dataverse + ":" + dataset + " using policy " + feedPolicy);
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Exception in resuming loser feed: " + dataverse + ":" + dataset + " using policy "
-                        + feedPolicy + " Exception " + e.getMessage());
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-app/src/main/java/org/apache/asterix/file/ExternalIndexingOperations.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/file/ExternalIndexingOperations.java b/asterix-app/src/main/java/org/apache/asterix/file/ExternalIndexingOperations.java
deleted file mode 100644
index 77c6a54..0000000
--- a/asterix-app/src/main/java/org/apache/asterix/file/ExternalIndexingOperations.java
+++ /dev/null
@@ -1,760 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.file;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.common.api.ILocalResourceMetadata;
-import org.apache.asterix.common.config.AsterixStorageProperties;
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState;
-import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
-import org.apache.asterix.common.config.DatasetConfig.IndexType;
-import org.apache.asterix.common.config.IAsterixPropertiesProvider;
-import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
-import org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory;
-import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
-import org.apache.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
-import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.external.indexing.FilesIndexDescription;
-import org.apache.asterix.external.indexing.IndexingConstants;
-import org.apache.asterix.external.operators.ExternalDataScanOperatorDescriptor;
-import org.apache.asterix.external.operators.ExternalDatasetIndexesAbortOperatorDescriptor;
-import org.apache.asterix.external.operators.ExternalDatasetIndexesCommitOperatorDescriptor;
-import org.apache.asterix.external.operators.ExternalDatasetIndexesRecoverOperatorDescriptor;
-import org.apache.asterix.external.operators.ExternalFilesIndexOperatorDescriptor;
-import org.apache.asterix.external.operators.IndexInfoOperatorDescriptor;
-import org.apache.asterix.external.provider.AdapterFactoryProvider;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
-import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import org.apache.asterix.formats.nontagged.AqlTypeTraitProvider;
-import org.apache.asterix.metadata.MetadataException;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.declared.AqlMetadataProvider;
-import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
-import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.utils.DatasetUtils;
-import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.BuiltinType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.om.util.NonTaggedFormatUtil;
-import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
-import org.apache.asterix.transaction.management.resource.ExternalBTreeLocalResourceMetadata;
-import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
-import org.apache.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
-import org.apache.asterix.translator.CompiledStatements.CompiledCreateIndexStatement;
-import org.apache.asterix.translator.CompiledStatements.CompiledIndexDropStatement;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.ITypeTraits;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
-import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
-import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor;
-import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
-import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeDataflowHelperFactory;
-import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeWithBuddyDataflowHelperFactory;
-import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
-import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor;
-import org.apache.hyracks.storage.am.lsm.rtree.dataflow.ExternalRTreeDataflowHelperFactory;
-import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType;
-import org.apache.hyracks.storage.common.file.LocalResource;
-
-public class ExternalIndexingOperations {
-
-    public static final List<List<String>> FILE_INDEX_FIELD_NAMES = new ArrayList<List<String>>();
-    public static final ArrayList<IAType> FILE_INDEX_FIELD_TYPES = new ArrayList<IAType>();
-
-    static {
-        FILE_INDEX_FIELD_NAMES.add(new ArrayList<String>(Arrays.asList("")));
-        FILE_INDEX_FIELD_TYPES.add(BuiltinType.ASTRING);
-    }
-
-    public static boolean isIndexible(ExternalDatasetDetails ds) {
-        String adapter = ds.getAdapter();
-        if (adapter.equalsIgnoreCase(ExternalDataConstants.ALIAS_HDFS_ADAPTER)) {
-            return true;
-        }
-        return false;
-    }
-
-    public static boolean isRefereshActive(ExternalDatasetDetails ds) {
-        return ds.getState() != ExternalDatasetTransactionState.COMMIT;
-    }
-
-    public static boolean isValidIndexName(String datasetName, String indexName) {
-        return (!datasetName.concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX).equals(indexName));
-    }
-
-    public static String getFilesIndexName(String datasetName) {
-        return datasetName.concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX);
-    }
-
-    public static int getRIDSize(Dataset dataset) {
-        ExternalDatasetDetails dsd = ((ExternalDatasetDetails) dataset.getDatasetDetails());
-        return IndexingConstants.getRIDSize(dsd.getProperties().get(IndexingConstants.KEY_INPUT_FORMAT));
-    }
-
-    public static IBinaryComparatorFactory[] getComparatorFactories(Dataset dataset) {
-        ExternalDatasetDetails dsd = ((ExternalDatasetDetails) dataset.getDatasetDetails());
-        return IndexingConstants.getComparatorFactories((dsd.getProperties().get(IndexingConstants.KEY_INPUT_FORMAT)));
-    }
-
-    public static IBinaryComparatorFactory[] getBuddyBtreeComparatorFactories() {
-        return IndexingConstants.getBuddyBtreeComparatorFactories();
-    }
-
-    public static ArrayList<ExternalFile> getSnapshotFromExternalFileSystem(Dataset dataset)
-            throws AlgebricksException {
-        ArrayList<ExternalFile> files = new ArrayList<ExternalFile>();
-        ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
-        try {
-            // Create the file system object
-            FileSystem fs = getFileSystemObject(datasetDetails.getProperties());
-            // Get paths of dataset
-            String path = datasetDetails.getProperties().get(ExternalDataConstants.KEY_PATH);
-            String[] paths = path.split(",");
-
-            // Add fileStatuses to files
-            for (String aPath : paths) {
-                FileStatus[] fileStatuses = fs.listStatus(new Path(aPath));
-                for (int i = 0; i < fileStatuses.length; i++) {
-                    int nextFileNumber = files.size();
-                    if (fileStatuses[i].isDirectory()) {
-                        listSubFiles(dataset, fs, fileStatuses[i], files);
-                    } else {
-                        files.add(new ExternalFile(dataset.getDataverseName(), dataset.getDatasetName(), nextFileNumber,
-                                fileStatuses[i].getPath().toUri().getPath(),
-                                new Date(fileStatuses[i].getModificationTime()), fileStatuses[i].getLen(),
-                                ExternalFilePendingOp.PENDING_NO_OP));
-                    }
-                }
-            }
-            // Close file system
-            fs.close();
-            if (files.size() == 0) {
-                throw new AlgebricksException("File Snapshot retrieved from external file system is empty");
-            }
-            return files;
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw new AlgebricksException("Unable to get list of HDFS files " + e);
-        }
-    }
-
-    /* list all files under the directory
-     * src is expected to be a folder
-     */
-    private static void listSubFiles(Dataset dataset, FileSystem srcFs, FileStatus src, ArrayList<ExternalFile> files)
-            throws IOException {
-        Path path = src.getPath();
-        FileStatus[] fileStatuses = srcFs.listStatus(path);
-        for (int i = 0; i < fileStatuses.length; i++) {
-            int nextFileNumber = files.size();
-            if (fileStatuses[i].isDirectory()) {
-                listSubFiles(dataset, srcFs, fileStatuses[i], files);
-            } else {
-                files.add(new ExternalFile(dataset.getDataverseName(), dataset.getDatasetName(), nextFileNumber,
-                        fileStatuses[i].getPath().toUri().getPath(), new Date(fileStatuses[i].getModificationTime()),
-                        fileStatuses[i].getLen(), ExternalFilePendingOp.PENDING_NO_OP));
-            }
-        }
-    }
-
-    public static FileSystem getFileSystemObject(Map<String, String> map) throws IOException {
-        Configuration conf = new Configuration();
-        conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_URI, map.get(ExternalDataConstants.KEY_HDFS_URL).trim());
-        conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_CLASS, DistributedFileSystem.class.getName());
-        return FileSystem.get(conf);
-    }
-
-    public static JobSpecification buildFilesIndexReplicationJobSpec(Dataset dataset,
-            ArrayList<ExternalFile> externalFilesSnapshot, AqlMetadataProvider metadataProvider, boolean createIndex)
-                    throws MetadataException, AlgebricksException {
-        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-        IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance();
-        AsterixStorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties();
-        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
-                metadataProvider.getMetadataTxnContext());
-        ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
-        Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
-                .splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(), dataset.getDatasetName(),
-                        getFilesIndexName(dataset.getDatasetName()), true);
-        IFileSplitProvider secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
-        FilesIndexDescription filesIndexDescription = new FilesIndexDescription();
-        ILocalResourceMetadata localResourceMetadata = new ExternalBTreeLocalResourceMetadata(
-                filesIndexDescription.EXTERNAL_FILE_INDEX_TYPE_TRAITS, filesIndexDescription.FILES_INDEX_COMP_FACTORIES,
-                new int[] { 0 }, false, dataset.getDatasetId(), mergePolicyFactory, mergePolicyFactoryProperties);
-        PersistentLocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
-                localResourceMetadata, LocalResource.ExternalBTreeResource);
-        ExternalBTreeDataflowHelperFactory indexDataflowHelperFactory = new ExternalBTreeDataflowHelperFactory(
-                mergePolicyFactory, mergePolicyFactoryProperties,
-                new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                storageProperties.getBloomFilterFalsePositiveRate(),
-                ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(dataset), true);
-        ExternalFilesIndexOperatorDescriptor externalFilesOp = new ExternalFilesIndexOperatorDescriptor(spec,
-                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                secondaryFileSplitProvider, indexDataflowHelperFactory, localResourceFactoryProvider,
-                externalFilesSnapshot, createIndex);
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, externalFilesOp,
-                secondarySplitsAndConstraint.second);
-        spec.addRoot(externalFilesOp);
-        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
-        return spec;
-    }
-
-    /**
-     * This method create an indexing operator that index records in HDFS
-     *
-     * @param jobSpec
-     * @param itemType
-     * @param dataset
-     * @param files
-     * @param indexerDesc
-     * @return
-     * @throws Exception
-     */
-    private static Pair<ExternalDataScanOperatorDescriptor, AlgebricksPartitionConstraint> getExternalDataIndexingOperator(
-            JobSpecification jobSpec, IAType itemType, Dataset dataset, List<ExternalFile> files,
-            RecordDescriptor indexerDesc, AqlMetadataProvider metadataProvider) throws Exception {
-        ExternalDatasetDetails externalDatasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
-        Map<String, String> configuration = externalDatasetDetails.getProperties();
-        IAdapterFactory adapterFactory = AdapterFactoryProvider.getAdapterFactory(externalDatasetDetails.getAdapter(),
-                configuration, (ARecordType) itemType, files, true);
-        return new Pair<ExternalDataScanOperatorDescriptor, AlgebricksPartitionConstraint>(
-                new ExternalDataScanOperatorDescriptor(jobSpec, indexerDesc, adapterFactory),
-                adapterFactory.getPartitionConstraint());
-    }
-
-    public static Pair<ExternalDataScanOperatorDescriptor, AlgebricksPartitionConstraint> createExternalIndexingOp(
-            JobSpecification spec, AqlMetadataProvider metadataProvider, Dataset dataset, ARecordType itemType,
-            RecordDescriptor indexerDesc, List<ExternalFile> files) throws Exception {
-        if (files == null) {
-            files = MetadataManager.INSTANCE.getDatasetExternalFiles(metadataProvider.getMetadataTxnContext(), dataset);
-        }
-        return getExternalDataIndexingOperator(spec, itemType, dataset, files, indexerDesc, metadataProvider);
-    }
-
-    /**
-     * At the end of this method, we expect to have 4 sets as follows:
-     * metadataFiles should contain only the files that are appended in their original state
-     * addedFiles should contain new files that has number assigned starting after the max original file number
-     * deletedFiles should contain files that are no longer there in the file system
-     * appendedFiles should have the new file information of existing files
-     * The method should return false in case of zero delta
-     *
-     * @param dataset
-     * @param metadataFiles
-     * @param addedFiles
-     * @param deletedFiles
-     * @param appendedFiles
-     * @return
-     * @throws MetadataException
-     * @throws AlgebricksException
-     */
-    public static boolean isDatasetUptodate(Dataset dataset, List<ExternalFile> metadataFiles,
-            List<ExternalFile> addedFiles, List<ExternalFile> deletedFiles, List<ExternalFile> appendedFiles)
-                    throws MetadataException, AlgebricksException {
-        boolean uptodate = true;
-        int newFileNumber = metadataFiles.get(metadataFiles.size() - 1).getFileNumber() + 1;
-
-        ArrayList<ExternalFile> fileSystemFiles = getSnapshotFromExternalFileSystem(dataset);
-
-        // Loop over file system files < taking care of added files >
-        for (ExternalFile fileSystemFile : fileSystemFiles) {
-            boolean fileFound = false;
-            Iterator<ExternalFile> mdFilesIterator = metadataFiles.iterator();
-            while (mdFilesIterator.hasNext()) {
-                ExternalFile metadataFile = mdFilesIterator.next();
-                if (fileSystemFile.getFileName().equals(metadataFile.getFileName())) {
-                    // Same file name
-                    if (fileSystemFile.getLastModefiedTime().equals(metadataFile.getLastModefiedTime())) {
-                        // Same timestamp
-                        if (fileSystemFile.getSize() == metadataFile.getSize()) {
-                            // Same size -> no op
-                            mdFilesIterator.remove();
-                            fileFound = true;
-                        } else {
-                            // Different size -> append op
-                            metadataFile.setPendingOp(ExternalFilePendingOp.PENDING_APPEND_OP);
-                            fileSystemFile.setPendingOp(ExternalFilePendingOp.PENDING_APPEND_OP);
-                            appendedFiles.add(fileSystemFile);
-                            fileFound = true;
-                            uptodate = false;
-                        }
-                    } else {
-                        // Same file name, Different file mod date -> delete and add
-                        metadataFile.setPendingOp(ExternalFilePendingOp.PENDING_DROP_OP);
-                        deletedFiles
-                                .add(new ExternalFile(metadataFile.getDataverseName(), metadataFile.getDatasetName(), 0,
-                                        metadataFile.getFileName(), metadataFile.getLastModefiedTime(),
-                                        metadataFile.getSize(), ExternalFilePendingOp.PENDING_DROP_OP));
-                        fileSystemFile.setPendingOp(ExternalFilePendingOp.PENDING_ADD_OP);
-                        fileSystemFile.setFileNumber(newFileNumber);
-                        addedFiles.add(fileSystemFile);
-                        newFileNumber++;
-                        fileFound = true;
-                        uptodate = false;
-                    }
-                }
-                if (fileFound) {
-                    break;
-                }
-            }
-            if (!fileFound) {
-                // File not stored previously in metadata -> pending add op
-                fileSystemFile.setPendingOp(ExternalFilePendingOp.PENDING_ADD_OP);
-                fileSystemFile.setFileNumber(newFileNumber);
-                addedFiles.add(fileSystemFile);
-                newFileNumber++;
-                uptodate = false;
-            }
-        }
-
-        // Done with files from external file system -> metadata files now contain both deleted files and appended ones
-        // first, correct number assignment to deleted and updated files
-        for (ExternalFile deletedFile : deletedFiles) {
-            deletedFile.setFileNumber(newFileNumber);
-            newFileNumber++;
-        }
-        for (ExternalFile appendedFile : appendedFiles) {
-            appendedFile.setFileNumber(newFileNumber);
-            newFileNumber++;
-        }
-
-        // include the remaining deleted files
-        Iterator<ExternalFile> mdFilesIterator = metadataFiles.iterator();
-        while (mdFilesIterator.hasNext()) {
-            ExternalFile metadataFile = mdFilesIterator.next();
-            if (metadataFile.getPendingOp() == ExternalFilePendingOp.PENDING_NO_OP) {
-                metadataFile.setPendingOp(ExternalFilePendingOp.PENDING_DROP_OP);
-                deletedFiles.add(new ExternalFile(metadataFile.getDataverseName(), metadataFile.getDatasetName(),
-                        newFileNumber, metadataFile.getFileName(), metadataFile.getLastModefiedTime(),
-                        metadataFile.getSize(), metadataFile.getPendingOp()));
-                newFileNumber++;
-                uptodate = false;
-            }
-        }
-        return uptodate;
-    }
-
-    public static Dataset createTransactionDataset(Dataset dataset) {
-        ExternalDatasetDetails originalDsd = (ExternalDatasetDetails) dataset.getDatasetDetails();
-        ExternalDatasetDetails dsd = new ExternalDatasetDetails(originalDsd.getAdapter(), originalDsd.getProperties(),
-                originalDsd.getTimestamp(), ExternalDatasetTransactionState.BEGIN);
-        Dataset transactionDatset = new Dataset(dataset.getDataverseName(), dataset.getDatasetName(),
-                dataset.getItemTypeDataverseName(), dataset.getItemTypeName(), dataset.getNodeGroupName(),
-                dataset.getCompactionPolicy(), dataset.getCompactionPolicyProperties(), dsd, dataset.getHints(),
-                DatasetType.EXTERNAL, dataset.getDatasetId(), dataset.getPendingOp());
-        return transactionDatset;
-    }
-
-    public static boolean isFileIndex(Index index) {
-        return (index.getIndexName().equals(getFilesIndexName(index.getDatasetName())));
-    }
-
-    public static JobSpecification buildDropFilesIndexJobSpec(CompiledIndexDropStatement indexDropStmt,
-            AqlMetadataProvider metadataProvider, Dataset dataset) throws AlgebricksException, MetadataException {
-        String dataverseName = indexDropStmt.getDataverseName() == null ? metadataProvider.getDefaultDataverseName()
-                : indexDropStmt.getDataverseName();
-        String datasetName = indexDropStmt.getDatasetName();
-        String indexName = indexDropStmt.getIndexName();
-        boolean temp = dataset.getDatasetDetails().isTemp();
-        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider
-                .splitProviderAndPartitionConstraintsForFilesIndex(dataverseName, datasetName, indexName, true);
-        AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
-        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
-                metadataProvider.getMetadataTxnContext());
-        IndexDropOperatorDescriptor btreeDrop = new IndexDropOperatorDescriptor(spec,
-                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                splitsAndConstraint.first,
-                new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
-                        compactionInfo.first, compactionInfo.second,
-                        new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                        storageProperties.getBloomFilterFalsePositiveRate(), false, null, null, null, null, !temp));
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeDrop,
-                splitsAndConstraint.second);
-        spec.addRoot(btreeDrop);
-
-        return spec;
-    }
-
-    public static JobSpecification buildFilesIndexUpdateOp(Dataset ds, List<ExternalFile> metadataFiles,
-            List<ExternalFile> deletedFiles, List<ExternalFile> addedFiles, List<ExternalFile> appendedFiles,
-            AqlMetadataProvider metadataProvider) throws MetadataException, AlgebricksException {
-        ArrayList<ExternalFile> files = new ArrayList<ExternalFile>();
-        for (ExternalFile file : metadataFiles) {
-            if (file.getPendingOp() == ExternalFilePendingOp.PENDING_DROP_OP) {
-                files.add(file);
-            } else if (file.getPendingOp() == ExternalFilePendingOp.PENDING_APPEND_OP) {
-                for (ExternalFile appendedFile : appendedFiles) {
-                    if (appendedFile.getFileName().equals(file.getFileName())) {
-                        files.add(new ExternalFile(file.getDataverseName(), file.getDatasetName(), file.getFileNumber(),
-                                file.getFileName(), file.getLastModefiedTime(), appendedFile.getSize(),
-                                ExternalFilePendingOp.PENDING_NO_OP));
-                    }
-                }
-            }
-        }
-        for (ExternalFile file : addedFiles) {
-            files.add(file);
-        }
-        Collections.sort(files);
-        return buildFilesIndexReplicationJobSpec(ds, files, metadataProvider, false);
-    }
-
-    public static JobSpecification buildIndexUpdateOp(Dataset ds, Index index, List<ExternalFile> metadataFiles,
-            List<ExternalFile> deletedFiles, List<ExternalFile> addedFiles, List<ExternalFile> appendedFiles,
-            AqlMetadataProvider metadataProvider) throws AsterixException, AlgebricksException {
-        // Create files list
-        ArrayList<ExternalFile> files = new ArrayList<ExternalFile>();
-
-        for (ExternalFile metadataFile : metadataFiles) {
-            if (metadataFile.getPendingOp() != ExternalFilePendingOp.PENDING_APPEND_OP) {
-                files.add(metadataFile);
-            } else {
-                metadataFile.setPendingOp(ExternalFilePendingOp.PENDING_NO_OP);
-                files.add(metadataFile);
-            }
-        }
-        // add new files
-        for (ExternalFile file : addedFiles) {
-            files.add(file);
-        }
-        // add appended files
-        for (ExternalFile file : appendedFiles) {
-            files.add(file);
-        }
-
-        CompiledCreateIndexStatement ccis = new CompiledCreateIndexStatement(index.getIndexName(),
-                index.getDataverseName(), index.getDatasetName(), index.getKeyFieldNames(), index.getKeyFieldTypes(),
-                index.isEnforcingKeyFileds(), index.getGramLength(), index.getIndexType());
-        return IndexOperations.buildSecondaryIndexLoadingJobSpec(ccis, null, null, metadataProvider, files);
-    }
-
-    public static JobSpecification buildCommitJob(Dataset ds, List<Index> indexes, AqlMetadataProvider metadataProvider)
-            throws AlgebricksException, AsterixException {
-        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-        IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance();
-        AsterixStorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties();
-        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(ds,
-                metadataProvider.getMetadataTxnContext());
-        boolean temp = ds.getDatasetDetails().isTemp();
-        ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
-        Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> filesIndexSplitsAndConstraint = metadataProvider
-                .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(),
-                        getFilesIndexName(ds.getDatasetName()), temp);
-        IFileSplitProvider filesIndexSplitProvider = filesIndexSplitsAndConstraint.first;
-        ExternalBTreeDataflowHelperFactory filesIndexDataflowHelperFactory = getFilesIndexDataflowHelperFactory(ds,
-                mergePolicyFactory, mergePolicyFactoryProperties, storageProperties, spec);
-        IndexInfoOperatorDescriptor filesIndexInfo = new IndexInfoOperatorDescriptor(filesIndexSplitProvider,
-                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER);
-
-        ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory> btreeDataflowHelperFactories = new ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory>();
-        ArrayList<IndexInfoOperatorDescriptor> btreeInfos = new ArrayList<IndexInfoOperatorDescriptor>();
-        ArrayList<ExternalRTreeDataflowHelperFactory> rtreeDataflowHelperFactories = new ArrayList<ExternalRTreeDataflowHelperFactory>();
-        ArrayList<IndexInfoOperatorDescriptor> rtreeInfos = new ArrayList<IndexInfoOperatorDescriptor>();
-
-        for (Index index : indexes) {
-            if (isValidIndexName(index.getDatasetName(), index.getIndexName())) {
-                Pair<IFileSplitProvider, AlgebricksPartitionConstraint> indexSplitsAndConstraint = metadataProvider
-                        .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(),
-                                index.getIndexName(), temp);
-                if (index.getIndexType() == IndexType.BTREE) {
-                    btreeDataflowHelperFactories.add(getBTreeDataflowHelperFactory(ds, index, mergePolicyFactory,
-                            mergePolicyFactoryProperties, storageProperties, spec));
-                    btreeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first,
-                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER));
-                } else if (index.getIndexType() == IndexType.RTREE) {
-                    rtreeDataflowHelperFactories.add(getRTreeDataflowHelperFactory(ds, index, mergePolicyFactory,
-                            mergePolicyFactoryProperties, storageProperties, metadataProvider, spec));
-                    rtreeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first,
-                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER));
-                }
-            }
-        }
-
-        ExternalDatasetIndexesCommitOperatorDescriptor op = new ExternalDatasetIndexesCommitOperatorDescriptor(spec,
-                filesIndexDataflowHelperFactory, filesIndexInfo, btreeDataflowHelperFactories, btreeInfos,
-                rtreeDataflowHelperFactories, rtreeInfos);
-
-        spec.addRoot(op);
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, op,
-                filesIndexSplitsAndConstraint.second);
-        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
-        return spec;
-    }
-
-    private static ExternalBTreeDataflowHelperFactory getFilesIndexDataflowHelperFactory(Dataset ds,
-            ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyFactoryProperties,
-            AsterixStorageProperties storageProperties, JobSpecification spec) {
-        return new ExternalBTreeDataflowHelperFactory(mergePolicyFactory, mergePolicyFactoryProperties,
-                new SecondaryIndexOperationTrackerProvider(ds.getDatasetId()),
-                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                storageProperties.getBloomFilterFalsePositiveRate(),
-                ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(ds), true);
-    }
-
-    private static ExternalBTreeWithBuddyDataflowHelperFactory getBTreeDataflowHelperFactory(Dataset ds, Index index,
-            ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyFactoryProperties,
-            AsterixStorageProperties storageProperties, JobSpecification spec) {
-        return new ExternalBTreeWithBuddyDataflowHelperFactory(mergePolicyFactory, mergePolicyFactoryProperties,
-                new SecondaryIndexOperationTrackerProvider(ds.getDatasetId()),
-                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE,
-                storageProperties.getBloomFilterFalsePositiveRate(), new int[] { index.getKeyFieldNames().size() },
-                ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(ds), true);
-    }
-
-    @SuppressWarnings("rawtypes")
-    private static ExternalRTreeDataflowHelperFactory getRTreeDataflowHelperFactory(Dataset ds, Index index,
-            ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyFactoryProperties,
-            AsterixStorageProperties storageProperties, AqlMetadataProvider metadataProvider, JobSpecification spec)
-                    throws AlgebricksException, AsterixException {
-        int numPrimaryKeys = getRIDSize(ds);
-        List<List<String>> secondaryKeyFields = index.getKeyFieldNames();
-        secondaryKeyFields.size();
-        ARecordType itemType = (ARecordType) metadataProvider.findType(ds.getItemTypeDataverseName(),
-                ds.getItemTypeName());
-        Pair<IAType, Boolean> spatialTypePair = Index.getNonNullableKeyFieldType(secondaryKeyFields.get(0), itemType);
-        IAType spatialType = spatialTypePair.first;
-        if (spatialType == null) {
-            throw new AsterixException("Could not find field " + secondaryKeyFields.get(0) + " in the schema.");
-        }
-        int numDimensions = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
-        int numNestedSecondaryKeyFields = numDimensions * 2;
-        IPrimitiveValueProviderFactory[] valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
-        IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
-
-        ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys
-                + numNestedSecondaryKeyFields];
-        ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numNestedSecondaryKeyFields + numPrimaryKeys];
-        IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
-        ATypeTag keyType = nestedKeyType.getTypeTag();
-
-        keyType = nestedKeyType.getTypeTag();
-        for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
-            ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
-                    .getSerializerDeserializer(nestedKeyType);
-            secondaryRecFields[i] = keySerde;
-
-            secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
-                    .getBinaryComparatorFactory(nestedKeyType, true);
-            secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
-            valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
-        }
-        // Add serializers and comparators for primary index fields.
-        for (int i = 0; i < numPrimaryKeys; i++) {
-            secondaryRecFields[numNestedSecondaryKeyFields + i] = IndexingConstants.getSerializerDeserializer(i);
-            secondaryTypeTraits[numNestedSecondaryKeyFields + i] = IndexingConstants.getTypeTraits(i);
-        }
-        int[] primaryKeyFields = new int[numPrimaryKeys];
-        for (int i = 0; i < primaryKeyFields.length; i++) {
-            primaryKeyFields[i] = i + numNestedSecondaryKeyFields;
-        }
-
-        return new ExternalRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
-                getBuddyBtreeComparatorFactories(), mergePolicyFactory, mergePolicyFactoryProperties,
-                new SecondaryIndexOperationTrackerProvider(ds.getDatasetId()),
-                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMRTreeIOOperationCallbackFactory.INSTANCE,
-                AqlMetadataProvider.proposeLinearizer(keyType, secondaryComparatorFactories.length),
-                storageProperties.getBloomFilterFalsePositiveRate(), new int[] { index.getKeyFieldNames().size() },
-                ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(ds), true);
-    }
-
-    public static JobSpecification buildAbortOp(Dataset ds, List<Index> indexes, AqlMetadataProvider metadataProvider)
-            throws AlgebricksException, AsterixException {
-        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-        IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance();
-        AsterixStorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties();
-        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(ds,
-                metadataProvider.getMetadataTxnContext());
-        ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
-        Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
-
-        boolean temp = ds.getDatasetDetails().isTemp();
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> filesIndexSplitsAndConstraint = metadataProvider
-                .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(),
-                        getFilesIndexName(ds.getDatasetName()), temp);
-        IFileSplitProvider filesIndexSplitProvider = filesIndexSplitsAndConstraint.first;
-        ExternalBTreeDataflowHelperFactory filesIndexDataflowHelperFactory = getFilesIndexDataflowHelperFactory(ds,
-                mergePolicyFactory, mergePolicyFactoryProperties, storageProperties, spec);
-        IndexInfoOperatorDescriptor filesIndexInfo = new IndexInfoOperatorDescriptor(filesIndexSplitProvider,
-                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER);
-
-        ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory> btreeDataflowHelperFactories = new ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory>();
-        ArrayList<IndexInfoOperatorDescriptor> btreeInfos = new ArrayList<IndexInfoOperatorDescriptor>();
-        ArrayList<ExternalRTreeDataflowHelperFactory> rtreeDataflowHelperFactories = new ArrayList<ExternalRTreeDataflowHelperFactory>();
-        ArrayList<IndexInfoOperatorDescriptor> rtreeInfos = new ArrayList<IndexInfoOperatorDescriptor>();
-
-        for (Index index : indexes) {
-            if (isValidIndexName(index.getDatasetName(), index.getIndexName())) {
-                Pair<IFileSplitProvider, AlgebricksPartitionConstraint> indexSplitsAndConstraint = metadataProvider
-                        .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(),
-                                index.getIndexName(), temp);
-                if (index.getIndexType() == IndexType.BTREE) {
-                    btreeDataflowHelperFactories.add(getBTreeDataflowHelperFactory(ds, index, mergePolicyFactory,
-                            mergePolicyFactoryProperties, storageProperties, spec));
-                    btreeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first,
-                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER));
-                } else if (index.getIndexType() == IndexType.RTREE) {
-                    rtreeDataflowHelperFactories.add(getRTreeDataflowHelperFactory(ds, index, mergePolicyFactory,
-                            mergePolicyFactoryProperties, storageProperties, metadataProvider, spec));
-                    rtreeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first,
-                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER));
-                }
-            }
-        }
-
-        ExternalDatasetIndexesAbortOperatorDescriptor op = new ExternalDatasetIndexesAbortOperatorDescriptor(spec,
-                filesIndexDataflowHelperFactory, filesIndexInfo, btreeDataflowHelperFactories, btreeInfos,
-                rtreeDataflowHelperFactories, rtreeInfos);
-
-        spec.addRoot(op);
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, op,
-                filesIndexSplitsAndConstraint.second);
-        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
-        return spec;
-
-    }
-
-    public static JobSpecification buildRecoverOp(Dataset ds, List<Index> indexes, AqlMetadataProvider metadataProvider)
-            throws AlgebricksException, AsterixException {
-        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-        IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance();
-        AsterixStorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties();
-        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(ds,
-                metadataProvider.getMetadataTxnContext());
-        ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
-        Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
-        boolean temp = ds.getDatasetDetails().isTemp();
-
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> filesIndexSplitsAndConstraint = metadataProvider
-                .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(),
-                        getFilesIndexName(ds.getDatasetName()), temp);
-        IFileSplitProvider filesIndexSplitProvider = filesIndexSplitsAndConstraint.first;
-        ExternalBTreeDataflowHelperFactory filesIndexDataflowHelperFactory = getFilesIndexDataflowHelperFactory(ds,
-                mergePolicyFactory, mergePolicyFactoryProperties, storageProperties, spec);
-        IndexInfoOperatorDescriptor filesIndexInfo = new IndexInfoOperatorDescriptor(filesIndexSplitProvider,
-                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER);
-
-        ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory> btreeDataflowHelperFactories = new ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory>();
-        ArrayList<IndexInfoOperatorDescriptor> btreeInfos = new ArrayList<IndexInfoOperatorDescriptor>();
-        ArrayList<ExternalRTreeDataflowHelperFactory> rtreeDataflowHelperFactories = new ArrayList<ExternalRTreeDataflowHelperFactory>();
-        ArrayList<IndexInfoOperatorDescriptor> rtreeInfos = new ArrayList<IndexInfoOperatorDescriptor>();
-
-        for (Index index : indexes) {
-            if (isValidIndexName(index.getDatasetName(), index.getIndexName())) {
-                Pair<IFileSplitProvider, AlgebricksPartitionConstraint> indexSplitsAndConstraint = metadataProvider
-                        .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(),
-                                index.getIndexName(), temp);
-                if (index.getIndexType() == IndexType.BTREE) {
-                    btreeDataflowHelperFactories.add(getBTreeDataflowHelperFactory(ds, index, mergePolicyFactory,
-                            mergePolicyFactoryProperties, storageProperties, spec));
-                    btreeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first,
-                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER));
-                } else if (index.getIndexType() == IndexType.RTREE) {
-                    rtreeDataflowHelperFactories.add(getRTreeDataflowHelperFactory(ds, index, mergePolicyFactory,
-                            mergePolicyFactoryProperties, storageProperties, metadataProvider, spec));
-                    rtreeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first,
-                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER));
-                }
-            }
-        }
-
-        ExternalDatasetIndexesRecoverOperatorDescriptor op = new ExternalDatasetIndexesRecoverOperatorDescriptor(spec,
-                filesIndexDataflowHelperFactory, filesIndexInfo, btreeDataflowHelperFactories, btreeInfos,
-                rtreeDataflowHelperFactories, rtreeInfos);
-
-        spec.addRoot(op);
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, op,
-                filesIndexSplitsAndConstraint.second);
-        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
-        return spec;
-    }
-
-    public static JobSpecification compactFilesIndexJobSpec(Dataset dataset, AqlMetadataProvider metadataProvider)
-            throws MetadataException, AlgebricksException {
-        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-        IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance();
-        AsterixStorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties();
-        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
-                metadataProvider.getMetadataTxnContext());
-        ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
-        Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
-                .splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(), dataset.getDatasetName(),
-                        getFilesIndexName(dataset.getDatasetName()), true);
-        IFileSplitProvider secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
-        ExternalBTreeDataflowHelperFactory indexDataflowHelperFactory = new ExternalBTreeDataflowHelperFactory(
-                mergePolicyFactory, mergePolicyFactoryProperties,
-                new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                storageProperties.getBloomFilterFalsePositiveRate(),
-                ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(dataset), true);
-        FilesIndexDescription filesIndexDescription = new FilesIndexDescription();
-        LSMTreeIndexCompactOperatorDescriptor compactOp = new LSMTreeIndexCompactOperatorDescriptor(spec,
-                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                secondaryFileSplitProvider, filesIndexDescription.EXTERNAL_FILE_INDEX_TYPE_TRAITS,
-                filesIndexDescription.FILES_INDEX_COMP_FACTORIES, new int[] { 0 }, indexDataflowHelperFactory,
-                NoOpOperationCallbackFactory.INSTANCE);
-        spec.addRoot(compactOp);
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp,
-                secondarySplitsAndConstraint.second);
-        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
-        return spec;
-    }
-}


Mime
View raw message