asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [3/6] asterixdb git commit: Refactor Messaging
Date Sat, 03 Sep 2016 12:44:48 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
deleted file mode 100644
index 5acf9ae..0000000
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
+++ /dev/null
@@ -1,690 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.om.util;
-
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Unmarshaller;
-
-import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
-import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.config.AsterixReplicationProperties;
-import org.apache.asterix.common.messaging.CompleteFailbackRequestMessage;
-import org.apache.asterix.common.messaging.CompleteFailbackResponseMessage;
-import org.apache.asterix.common.messaging.PreparePartitionsFailbackRequestMessage;
-import org.apache.asterix.common.messaging.PreparePartitionsFailbackResponseMessage;
-import org.apache.asterix.common.messaging.ReplicaEventMessage;
-import org.apache.asterix.common.messaging.TakeoverMetadataNodeRequestMessage;
-import org.apache.asterix.common.messaging.TakeoverMetadataNodeResponseMessage;
-import org.apache.asterix.common.messaging.TakeoverPartitionsRequestMessage;
-import org.apache.asterix.common.messaging.TakeoverPartitionsResponseMessage;
-import org.apache.asterix.common.messaging.api.ICCMessageBroker;
-import org.apache.asterix.common.replication.NodeFailbackPlan;
-import org.apache.asterix.common.replication.NodeFailbackPlan.FailbackPlanState;
-import org.apache.asterix.event.schema.cluster.Cluster;
-import org.apache.asterix.event.schema.cluster.Node;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-/**
- * A holder class for properties related to the Asterix cluster.
- */
-
-public class AsterixClusterProperties {
-    /**
-     * TODO: currently after instance restarts we require all nodes to join again, otherwise the cluster wont be ACTIVE.
-     * we may overcome this by storing the cluster state before the instance shutdown and using it on startup to identify
-     * the nodes that are expected the join.
-     */
-
-    private static final Logger LOGGER = Logger.getLogger(AsterixClusterProperties.class.getName());
-    public static final AsterixClusterProperties INSTANCE = new AsterixClusterProperties();
-    public static final String CLUSTER_CONFIGURATION_FILE = "cluster.xml";
-
-    private static final String CLUSTER_NET_IP_ADDRESS_KEY = "cluster-net-ip-address";
-    private static final String IO_DEVICES = "iodevices";
-    private static final String DEFAULT_STORAGE_DIR_NAME = "storage";
-    private Map<String, Map<String, String>> activeNcConfiguration = new HashMap<String, Map<String, String>>();
-
-    private final Cluster cluster;
-    private ClusterState state = ClusterState.UNUSABLE;
-
-    private AlgebricksAbsolutePartitionConstraint clusterPartitionConstraint;
-
-    private boolean globalRecoveryCompleted = false;
-
-    private Map<String, ClusterPartition[]> node2PartitionsMap = null;
-    private SortedMap<Integer, ClusterPartition> clusterPartitions = null;
-    private Map<Long, TakeoverPartitionsRequestMessage> pendingTakeoverRequests = null;
-
-    private long clusterRequestId = 0;
-    private String currentMetadataNode = null;
-    private boolean metadataNodeActive = false;
-    private boolean autoFailover = false;
-    private boolean replicationEnabled = false;
-    private Set<String> failedNodes = new HashSet<>();
-    private LinkedList<NodeFailbackPlan> pendingProcessingFailbackPlans;
-    private Map<Long, NodeFailbackPlan> planId2FailbackPlanMap;
-
-    private AsterixClusterProperties() {
-        InputStream is = this.getClass().getClassLoader().getResourceAsStream(CLUSTER_CONFIGURATION_FILE);
-        if (is != null) {
-            try {
-                JAXBContext ctx = JAXBContext.newInstance(Cluster.class);
-                Unmarshaller unmarshaller = ctx.createUnmarshaller();
-                cluster = (Cluster) unmarshaller.unmarshal(is);
-            } catch (JAXBException e) {
-                throw new IllegalStateException("Failed to read configuration file " + CLUSTER_CONFIGURATION_FILE);
-            }
-        } else {
-            cluster = null;
-        }
-        // if this is the CC process
-        if (AsterixAppContextInfo.getInstance() != null) {
-            if (AsterixAppContextInfo.getInstance().getCCApplicationContext() != null) {
-                node2PartitionsMap = AsterixAppContextInfo.getInstance().getMetadataProperties().getNodePartitions();
-                clusterPartitions = AsterixAppContextInfo.getInstance().getMetadataProperties().getClusterPartitions();
-                currentMetadataNode = AsterixAppContextInfo.getInstance().getMetadataProperties().getMetadataNodeName();
-                replicationEnabled = isReplicationEnabled();
-                autoFailover = isAutoFailoverEnabled();
-                if (autoFailover) {
-                    pendingTakeoverRequests = new HashMap<>();
-                    pendingProcessingFailbackPlans = new LinkedList<>();
-                    planId2FailbackPlanMap = new HashMap<>();
-                }
-            }
-        }
-    }
-
-    public synchronized void removeNCConfiguration(String nodeId) {
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Removing configuration parameters for node id " + nodeId);
-        }
-        activeNcConfiguration.remove(nodeId);
-
-        //if this node was waiting for failback and failed before it completed
-        if (failedNodes.contains(nodeId)) {
-            if (autoFailover) {
-                notifyFailbackPlansNodeFailure(nodeId);
-                revertFailedFailbackPlanEffects();
-            }
-        } else {
-            //an active node failed
-            failedNodes.add(nodeId);
-            if (nodeId.equals(currentMetadataNode)) {
-                metadataNodeActive = false;
-                LOGGER.info("Metadata node is now inactive");
-            }
-            updateNodePartitions(nodeId, false);
-            if (replicationEnabled) {
-                notifyImpactedReplicas(nodeId, ClusterEventType.NODE_FAILURE);
-                if (autoFailover) {
-                    notifyFailbackPlansNodeFailure(nodeId);
-                    requestPartitionsTakeover(nodeId);
-                }
-            }
-        }
-    }
-
-    public synchronized void addNCConfiguration(String nodeId, Map<String, String> configuration) {
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Registering configuration parameters for node id " + nodeId);
-        }
-        activeNcConfiguration.put(nodeId, configuration);
-
-        //a node trying to come back after failure
-        if (failedNodes.contains(nodeId)) {
-            if (autoFailover) {
-                prepareFailbackPlan(nodeId);
-                return;
-            } else {
-                //a node completed local or remote recovery and rejoined
-                failedNodes.remove(nodeId);
-                if (replicationEnabled) {
-                    //notify other replica to reconnect to this node
-                    notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN);
-                }
-            }
-        }
-
-        if (nodeId.equals(currentMetadataNode)) {
-            metadataNodeActive = true;
-            LOGGER.info("Metadata node is now active");
-        }
-        updateNodePartitions(nodeId, true);
-    }
-
-    private synchronized void updateNodePartitions(String nodeId, boolean added) {
-        ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
-        // if this isn't a storage node, it will not have cluster partitions
-        if (nodePartitions != null) {
-            for (ClusterPartition p : nodePartitions) {
-                // set the active node for this node's partitions
-                p.setActive(added);
-                if (added) {
-                    p.setActiveNodeId(nodeId);
-                }
-            }
-            resetClusterPartitionConstraint();
-            updateClusterState();
-        }
-    }
-
-    private synchronized void updateClusterState() {
-        for (ClusterPartition p : clusterPartitions.values()) {
-            if (!p.isActive()) {
-                state = ClusterState.UNUSABLE;
-                LOGGER.info("Cluster is in UNUSABLE state");
-                return;
-            }
-        }
-        //if all storage partitions are active as well as the metadata node, then the cluster is active
-        if (metadataNodeActive) {
-            state = ClusterState.ACTIVE;
-            LOGGER.info("Cluster is now ACTIVE");
-            //start global recovery
-            AsterixAppContextInfo.getInstance().getGlobalRecoveryManager().startGlobalRecovery();
-            if (autoFailover) {
-                //if there are any pending failback requests, process them
-                if (pendingProcessingFailbackPlans.size() > 0) {
-                    processPendingFailbackPlans();
-                }
-            }
-        } else {
-            requestMetadataNodeTakeover();
-        }
-    }
-
-    /**
-     * Returns the number of IO devices configured for a Node Controller
-     *
-     * @param nodeId
-     *            unique identifier of the Node Controller
-     * @return number of IO devices. -1 if the node id is not valid. A node id
-     *         is not valid if it does not correspond to the set of registered
-     *         Node Controllers.
-     */
-    public int getNumberOfIODevices(String nodeId) {
-        String[] ioDevs = getIODevices(nodeId);
-        return ioDevs == null ? -1 : ioDevs.length;
-    }
-
-    /**
-     * Returns the IO devices configured for a Node Controller
-     *
-     * @param nodeId
-     *            unique identifier of the Node Controller
-     * @return a list of IO devices. null if node id is not valid. A node id is not valid
-     *         if it does not correspond to the set of registered Node Controllers.
-     */
-    public synchronized String[] getIODevices(String nodeId) {
-        Map<String, String> ncConfig = activeNcConfiguration.get(nodeId);
-        if (ncConfig == null) {
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Configuration parameters for nodeId " + nodeId
-                        + " not found. The node has not joined yet or has left.");
-            }
-            return null;
-        }
-        return ncConfig.get(IO_DEVICES).split(",");
-    }
-
-    public ClusterState getState() {
-        return state;
-    }
-
-    public Cluster getCluster() {
-        return cluster;
-    }
-
-    public synchronized Node getAvailableSubstitutionNode() {
-        List<Node> subNodes = cluster.getSubstituteNodes() == null ? null : cluster.getSubstituteNodes().getNode();
-        return subNodes == null || subNodes.isEmpty() ? null : subNodes.get(0);
-    }
-
-    public synchronized Set<String> getParticipantNodes() {
-        Set<String> participantNodes = new HashSet<String>();
-        for (String pNode : activeNcConfiguration.keySet()) {
-            participantNodes.add(pNode);
-        }
-        return participantNodes;
-    }
-
-    public synchronized AlgebricksAbsolutePartitionConstraint getClusterLocations() {
-        if (clusterPartitionConstraint == null) {
-            resetClusterPartitionConstraint();
-        }
-        return clusterPartitionConstraint;
-    }
-
-    private synchronized void resetClusterPartitionConstraint() {
-        ArrayList<String> clusterActiveLocations = new ArrayList<>();
-        for (ClusterPartition p : clusterPartitions.values()) {
-            if (p.isActive()) {
-                clusterActiveLocations.add(p.getActiveNodeId());
-            }
-        }
-        clusterPartitionConstraint = new AlgebricksAbsolutePartitionConstraint(
-                clusterActiveLocations.toArray(new String[] {}));
-    }
-
-    public boolean isGlobalRecoveryCompleted() {
-        return globalRecoveryCompleted;
-    }
-
-    public void setGlobalRecoveryCompleted(boolean globalRecoveryCompleted) {
-        this.globalRecoveryCompleted = globalRecoveryCompleted;
-    }
-
-    public boolean isClusterActive() {
-        if (cluster == null) {
-            // this is a virtual cluster
-            return true;
-        }
-        return state == ClusterState.ACTIVE;
-    }
-
-    public static int getNumberOfNodes() {
-        return AsterixAppContextInfo.getInstance().getMetadataProperties().getNodeNames().size();
-    }
-
-    public synchronized ClusterPartition[] getNodePartitions(String nodeId) {
-        return node2PartitionsMap.get(nodeId);
-    }
-
-    public synchronized int getNodePartitionsCount(String node) {
-        if (node2PartitionsMap.containsKey(node)) {
-            return node2PartitionsMap.get(node).length;
-        }
-        return 0;
-    }
-
-    public synchronized ClusterPartition[] getClusterPartitons() {
-        ArrayList<ClusterPartition> partitons = new ArrayList<>();
-        for (ClusterPartition partition : clusterPartitions.values()) {
-            partitons.add(partition);
-        }
-        return partitons.toArray(new ClusterPartition[] {});
-    }
-
-    public String getStorageDirectoryName() {
-        if (cluster != null) {
-            return cluster.getStore();
-        }
-        // virtual cluster without cluster config file
-        return DEFAULT_STORAGE_DIR_NAME;
-    }
-
-    private synchronized void requestPartitionsTakeover(String failedNodeId) {
-        //replica -> list of partitions to takeover
-        Map<String, List<Integer>> partitionRecoveryPlan = new HashMap<>();
-        AsterixReplicationProperties replicationProperties = AsterixAppContextInfo.getInstance()
-                .getReplicationProperties();
-
-        //collect the partitions of the failed NC
-        List<ClusterPartition> lostPartitions = getNodeAssignedPartitions(failedNodeId);
-        if (lostPartitions.size() > 0) {
-            for (ClusterPartition partition : lostPartitions) {
-                //find replicas for this partitions
-                Set<String> partitionReplicas = replicationProperties.getNodeReplicasIds(partition.getNodeId());
-                //find a replica that is still active
-                for (String replica : partitionReplicas) {
-                    //TODO (mhubail) currently this assigns the partition to the first found active replica.
-                    //It needs to be modified to consider load balancing.
-                    if (activeNcConfiguration.containsKey(replica) && !failedNodes.contains(replica)) {
-                        if (!partitionRecoveryPlan.containsKey(replica)) {
-                            List<Integer> replicaPartitions = new ArrayList<>();
-                            replicaPartitions.add(partition.getPartitionId());
-                            partitionRecoveryPlan.put(replica, replicaPartitions);
-                        } else {
-                            partitionRecoveryPlan.get(replica).add(partition.getPartitionId());
-                        }
-                    }
-                    break;
-                }
-            }
-
-            if (partitionRecoveryPlan.size() == 0) {
-                //no active replicas were found for the failed node
-                LOGGER.severe("Could not find active replicas for the partitions " + lostPartitions);
-                return;
-            } else {
-                LOGGER.info("Partitions to recover: " + lostPartitions);
-            }
-            ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.getInstance()
-                    .getCCApplicationContext().getMessageBroker();
-            //For each replica, send a request to takeover the assigned partitions
-            for (String replica : partitionRecoveryPlan.keySet()) {
-                Integer[] partitionsToTakeover = partitionRecoveryPlan.get(replica).toArray(new Integer[] {});
-                long requestId = clusterRequestId++;
-                TakeoverPartitionsRequestMessage takeoverRequest = new TakeoverPartitionsRequestMessage(requestId,
-                        replica, partitionsToTakeover);
-                pendingTakeoverRequests.put(requestId, takeoverRequest);
-                try {
-                    messageBroker.sendApplicationMessageToNC(takeoverRequest, replica);
-                } catch (Exception e) {
-                    /**
-                     * if we fail to send the request, it means the NC we tried to send the request to
-                     * has failed. When the failure notification arrives, we will send any pending request
-                     * that belongs to the failed NC to a different active replica.
-                     */
-                    LOGGER.warning("Failed to send takeover request: " + takeoverRequest);
-                    e.printStackTrace();
-                }
-            }
-        }
-    }
-
-    private synchronized List<ClusterPartition> getNodeAssignedPartitions(String nodeId) {
-        List<ClusterPartition> nodePartitions = new ArrayList<>();
-        for (ClusterPartition partition : clusterPartitions.values()) {
-            if (partition.getActiveNodeId().equals(nodeId)) {
-                nodePartitions.add(partition);
-            }
-        }
-        /**
-         * if there is any pending takeover request that this node was supposed to handle,
-         * it needs to be sent to a different replica
-         */
-        List<Long> failedTakeoverRequests = new ArrayList<>();
-        for (TakeoverPartitionsRequestMessage request : pendingTakeoverRequests.values()) {
-            if (request.getNodeId().equals(nodeId)) {
-                for (Integer partitionId : request.getPartitions()) {
-                    nodePartitions.add(clusterPartitions.get(partitionId));
-                }
-                failedTakeoverRequests.add(request.getId());
-            }
-        }
-
-        //remove failed requests
-        for (Long requestId : failedTakeoverRequests) {
-            pendingTakeoverRequests.remove(requestId);
-        }
-        return nodePartitions;
-    }
-
-    private synchronized void requestMetadataNodeTakeover() {
-        //need a new node to takeover metadata node
-        ClusterPartition metadataPartiton = AsterixAppContextInfo.getInstance().getMetadataProperties()
-                .getMetadataPartition();
-        //request the metadataPartition node to register itself as the metadata node
-        TakeoverMetadataNodeRequestMessage takeoverRequest = new TakeoverMetadataNodeRequestMessage();
-        ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.getInstance()
-                .getCCApplicationContext().getMessageBroker();
-        try {
-            messageBroker.sendApplicationMessageToNC(takeoverRequest, metadataPartiton.getActiveNodeId());
-        } catch (Exception e) {
-            /**
-             * if we fail to send the request, it means the NC we tried to send the request to
-             * has failed. When the failure notification arrives, a new NC will be assigned to
-             * the metadata partition and a new metadata node takeover request will be sent to it.
-             */
-            LOGGER.warning("Failed to send metadata node takeover request to: " + metadataPartiton.getActiveNodeId());
-            e.printStackTrace();
-        }
-    }
-
-    public synchronized void processPartitionTakeoverResponse(TakeoverPartitionsResponseMessage reponse) {
-        for (Integer partitonId : reponse.getPartitions()) {
-            ClusterPartition partition = clusterPartitions.get(partitonId);
-            partition.setActive(true);
-            partition.setActiveNodeId(reponse.getNodeId());
-        }
-        pendingTakeoverRequests.remove(reponse.getRequestId());
-        resetClusterPartitionConstraint();
-        updateClusterState();
-    }
-
-    public synchronized void processMetadataNodeTakeoverResponse(TakeoverMetadataNodeResponseMessage reponse) {
-        currentMetadataNode = reponse.getNodeId();
-        metadataNodeActive = true;
-        LOGGER.info("Current metadata node: " + currentMetadataNode);
-        updateClusterState();
-    }
-
-    private synchronized void prepareFailbackPlan(String failingBackNodeId) {
-        NodeFailbackPlan plan = NodeFailbackPlan.createPlan(failingBackNodeId);
-        pendingProcessingFailbackPlans.add(plan);
-        planId2FailbackPlanMap.put(plan.getPlanId(), plan);
-
-        //get all partitions this node requires to resync
-        AsterixReplicationProperties replicationProperties = AsterixAppContextInfo.getInstance()
-                .getReplicationProperties();
-        Set<String> nodeReplicas = replicationProperties.getNodeReplicationClients(failingBackNodeId);
-        for (String replicaId : nodeReplicas) {
-            ClusterPartition[] nodePartitions = node2PartitionsMap.get(replicaId);
-            for (ClusterPartition partition : nodePartitions) {
-                plan.addParticipant(partition.getActiveNodeId());
-                /**
-                 * if the partition original node is the returning node,
-                 * add it to the list of the partitions which will be failed back
-                 */
-                if (partition.getNodeId().equals(failingBackNodeId)) {
-                    plan.addPartitionToFailback(partition.getPartitionId(), partition.getActiveNodeId());
-                }
-            }
-        }
-
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Prepared Failback plan: " + plan.toString());
-        }
-
-        processPendingFailbackPlans();
-    }
-
-    private synchronized void processPendingFailbackPlans() {
-        /**
-         * if the cluster state is not ACTIVE, then failbacks should not be processed
-         * since some partitions are not active
-         */
-        if (state == ClusterState.ACTIVE) {
-            while (!pendingProcessingFailbackPlans.isEmpty()) {
-                //take the first pending failback plan
-                NodeFailbackPlan plan = pendingProcessingFailbackPlans.pop();
-                /**
-                 * A plan at this stage will be in one of two states:
-                 * 1. PREPARING -> the participants were selected but we haven't sent any request.
-                 * 2. PENDING_ROLLBACK -> a participant failed before we send any requests
-                 */
-                if (plan.getState() == FailbackPlanState.PREPARING) {
-                    //set the partitions that will be failed back as inactive
-                    String failbackNode = plan.getNodeId();
-                    for (Integer partitionId : plan.getPartitionsToFailback()) {
-                        ClusterPartition clusterPartition = clusterPartitions.get(partitionId);
-                        clusterPartition.setActive(false);
-                        //partition expected to be returned to the failing back node
-                        clusterPartition.setActiveNodeId(failbackNode);
-                    }
-
-                    /**
-                     * if the returning node is the original metadata node,
-                     * then metadata node will change after the failback completes
-                     */
-                    String originalMetadataNode = AsterixAppContextInfo.getInstance().getMetadataProperties()
-                            .getMetadataNodeName();
-                    if (originalMetadataNode.equals(failbackNode)) {
-                        plan.setNodeToReleaseMetadataManager(currentMetadataNode);
-                        currentMetadataNode = "";
-                        metadataNodeActive = false;
-                    }
-
-                    //force new jobs to wait
-                    state = ClusterState.REBALANCING;
-
-                    ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.getInstance()
-                            .getCCApplicationContext().getMessageBroker();
-                    //send requests to other nodes to complete on-going jobs and prepare partitions for failback
-                    Set<PreparePartitionsFailbackRequestMessage> planFailbackRequests = plan.getPlanFailbackRequests();
-                    for (PreparePartitionsFailbackRequestMessage request : planFailbackRequests) {
-                        try {
-                            messageBroker.sendApplicationMessageToNC(request, request.getNodeID());
-                            plan.addPendingRequest(request);
-                        } catch (Exception e) {
-                            LOGGER.warning("Failed to send failback request to: " + request.getNodeID());
-                            e.printStackTrace();
-                            plan.notifyNodeFailure(request.getNodeID());
-                            revertFailedFailbackPlanEffects();
-                            break;
-                        }
-                    }
-                    /**
-                     * wait until the current plan is completed before processing the next plan.
-                     * when the current one completes or is reverted, the cluster state will be
-                     * ACTIVE again, and the next failback plan (if any) will be processed.
-                     */
-                    break;
-                } else if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
-                    //this plan failed before sending any requests -> nothing to rollback
-                    planId2FailbackPlanMap.remove(plan.getPlanId());
-                }
-            }
-        }
-    }
-
-    public synchronized void processPreparePartitionsFailbackResponse(PreparePartitionsFailbackResponseMessage msg) {
-        NodeFailbackPlan plan = planId2FailbackPlanMap.get(msg.getPlanId());
-        plan.markRequestCompleted(msg.getRequestId());
-        /**
-         * A plan at this stage will be in one of three states:
-         * 1. PENDING_PARTICIPANT_REPONSE -> one or more responses are still expected (wait).
-         * 2. PENDING_COMPLETION -> all responses received (time to send completion request).
-         * 3. PENDING_ROLLBACK -> the plan failed and we just received the final pending response (revert).
-         */
-        if (plan.getState() == FailbackPlanState.PENDING_COMPLETION) {
-            CompleteFailbackRequestMessage request = plan.getCompleteFailbackRequestMessage();
-
-            //send complete resync and takeover partitions to the failing back node
-            ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.getInstance()
-                    .getCCApplicationContext().getMessageBroker();
-            try {
-                messageBroker.sendApplicationMessageToNC(request, request.getNodeId());
-            } catch (Exception e) {
-                LOGGER.warning("Failed to send complete failback request to: " + request.getNodeId());
-                e.printStackTrace();
-                notifyFailbackPlansNodeFailure(request.getNodeId());
-                revertFailedFailbackPlanEffects();
-            }
-        } else if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
-            revertFailedFailbackPlanEffects();
-        }
-    }
-
-    public synchronized void processCompleteFailbackResponse(CompleteFailbackResponseMessage reponse) {
-        /**
-         * the failback plan completed successfully:
-         * Remove all references to it.
-         * Remove the the failing back node from the failed nodes list.
-         * Notify its replicas to reconnect to it.
-         * Set the failing back node partitions as active.
-         */
-        NodeFailbackPlan plan = planId2FailbackPlanMap.remove(reponse.getPlanId());
-        String nodeId = plan.getNodeId();
-        failedNodes.remove(nodeId);
-        //notify impacted replicas they can reconnect to this node
-        notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN);
-        updateNodePartitions(nodeId, true);
-    }
-
-    private synchronized void notifyImpactedReplicas(String nodeId, ClusterEventType event) {
-        AsterixReplicationProperties replicationProperties = AsterixAppContextInfo.getInstance()
-                .getReplicationProperties();
-        Set<String> remoteReplicas = replicationProperties.getRemoteReplicasIds(nodeId);
-        String nodeIdAddress = "";
-        //in case the node joined with a new IP address, we need to send it to the other replicas
-        if (event == ClusterEventType.NODE_JOIN) {
-            nodeIdAddress = activeNcConfiguration.get(nodeId).get(CLUSTER_NET_IP_ADDRESS_KEY);
-        }
-
-        ReplicaEventMessage msg = new ReplicaEventMessage(nodeId, nodeIdAddress, event);
-        ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.getInstance()
-                .getCCApplicationContext().getMessageBroker();
-        for (String replica : remoteReplicas) {
-            //if the remote replica is alive, send the event
-            if (activeNcConfiguration.containsKey(replica)) {
-                try {
-                    messageBroker.sendApplicationMessageToNC(msg, replica);
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-    }
-
-    private synchronized void revertFailedFailbackPlanEffects() {
-        Iterator<NodeFailbackPlan> iterator = planId2FailbackPlanMap.values().iterator();
-        while (iterator.hasNext()) {
-            NodeFailbackPlan plan = iterator.next();
-            if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
-                //TODO if the failing back node is still active, notify it to construct a new plan for it
-                iterator.remove();
-
-                //reassign the partitions that were supposed to be failed back to an active replica
-                requestPartitionsTakeover(plan.getNodeId());
-            }
-        }
-    }
-
-    private synchronized void notifyFailbackPlansNodeFailure(String nodeId) {
-        Iterator<NodeFailbackPlan> iterator = planId2FailbackPlanMap.values().iterator();
-        while (iterator.hasNext()) {
-            NodeFailbackPlan plan = iterator.next();
-            plan.notifyNodeFailure(nodeId);
-        }
-    }
-
-    public synchronized boolean isMetadataNodeActive() {
-        return metadataNodeActive;
-    }
-
-    public boolean isReplicationEnabled() {
-        if (cluster != null && cluster.getDataReplication() != null) {
-            return cluster.getDataReplication().isEnabled();
-        }
-        return false;
-    }
-
-    public boolean isAutoFailoverEnabled() {
-        return isReplicationEnabled() && cluster.getDataReplication().isAutoFailover();
-    }
-
-    public synchronized JSONObject getClusterStateDescription() throws JSONException {
-        JSONObject stateDescription = new JSONObject();
-        stateDescription.put("State", state.name());
-        stateDescription.put("Metadata_Node", currentMetadataNode);
-        for (ClusterPartition partition : clusterPartitions.values()) {
-            stateDescription.put("partition_" + partition.getPartitionId(), partition.getActiveNodeId());
-        }
-        return stateDescription;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixRuntimeUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixRuntimeUtil.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixRuntimeUtil.java
deleted file mode 100644
index 0e9aa0c..0000000
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixRuntimeUtil.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.om.util;
-
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hyracks.control.cc.ClusterControllerService;
-
-/**
- * Utility class for obtaining information on the set of Hyracks NodeController
- * processes that are running on a given host.
- */
-public class AsterixRuntimeUtil {
-
-    public static Set<String> getNodeControllersOnIP(InetAddress ipAddress) throws Exception {
-        Map<InetAddress, Set<String>> nodeControllerInfo = getNodeControllerMap();
-        Set<String> nodeControllersAtLocation = nodeControllerInfo.get(ipAddress);
-        return nodeControllersAtLocation;
-    }
-
-    public static List<String> getAllNodeControllers() throws Exception {
-        Collection<Set<String>> nodeControllersCollection = getNodeControllerMap().values();
-        List<String> nodeControllers = new ArrayList<String>();
-        for (Set<String> ncCollection : nodeControllersCollection) {
-            nodeControllers.addAll(ncCollection);
-        }
-        return nodeControllers;
-    }
-
-    public static Map<InetAddress, Set<String>> getNodeControllerMap() throws Exception {
-        Map<InetAddress, Set<String>> map = new HashMap<InetAddress, Set<String>>();
-        AsterixAppContextInfo.getInstance().getCCApplicationContext().getCCContext().getIPAddressNodeMap(map);
-        return map;
-    }
-
-    public static void getNodeControllerMap(Map<InetAddress, Set<String>> map) throws Exception {
-        ClusterControllerService ccs = (ClusterControllerService) AsterixAppContextInfo.getInstance()
-                .getCCApplicationContext().getControllerService();
-        map.putAll(ccs.getIpAddressNodeNameMap());
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index d247490..5a88a9f 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -59,6 +59,7 @@ import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.LogSource;
 import org.apache.asterix.common.transactions.LogType;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.common.utils.TransactionUtil;
 import org.apache.asterix.replication.functions.ReplicaFilesRequest;
 import org.apache.asterix.replication.functions.ReplicaIndexFlushRequest;
@@ -69,7 +70,6 @@ import org.apache.asterix.replication.storage.LSMComponentLSNSyncTask;
 import org.apache.asterix.replication.storage.LSMComponentProperties;
 import org.apache.asterix.replication.storage.LSMIndexFileProperties;
 import org.apache.asterix.replication.storage.ReplicaResourcesManager;
-import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.application.INCApplicationContext;
 import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index 5ba6ad2..53bd164 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -65,6 +65,7 @@ import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.ILogRecord;
 import org.apache.asterix.common.transactions.LogType;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.event.schema.cluster.Node;
 import org.apache.asterix.replication.functions.ReplicaFilesRequest;
 import org.apache.asterix.replication.functions.ReplicaIndexFlushRequest;
@@ -75,7 +76,6 @@ import org.apache.asterix.replication.logging.TxnLogReplicator;
 import org.apache.asterix.replication.storage.LSMComponentProperties;
 import org.apache.asterix.replication.storage.LSMIndexFileProperties;
 import org.apache.asterix.replication.storage.ReplicaResourcesManager;
-import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.replication.IReplicationJob;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
index 4da5fd4..46a2076 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
@@ -38,9 +38,9 @@ import org.apache.asterix.common.replication.IRemoteRecoveryManager;
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.IRecoveryManager;
-import org.apache.asterix.om.util.AsterixClusterProperties;
-import org.apache.asterix.replication.storage.ReplicaResourcesManager;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.asterix.replication.storage.ReplicaResourcesManager;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
 
 public class RemoteRecoveryManager implements IRemoteRecoveryManager {
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
index 41fc0b8..01d6db7 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
@@ -40,10 +40,10 @@ import java.util.logging.Logger;
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.AsterixMetadataProperties;
 import org.apache.asterix.common.replication.IReplicaResourcesManager;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
-import org.apache.asterix.om.util.AsterixClusterProperties;
-import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.common.file.ILocalResourceRepository;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java
new file mode 100644
index 0000000..2eb1be9
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+
+public abstract class AbstractFailbackPlanMessage extends AbstractApplicationMessage {
+
+    private static final long serialVersionUID = 1L;
+    protected final long planId;
+    protected final int requestId;
+
+    public AbstractFailbackPlanMessage(long planId, int requestId) {
+        this.planId = planId;
+        this.requestId = requestId;
+    }
+
+    public long getPlanId() {
+        return planId;
+    }
+
+    public int getRequestId() {
+        return requestId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java
new file mode 100644
index 0000000..c96bcb8
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.replication.IRemoteRecoveryManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class CompleteFailbackRequestMessage extends AbstractFailbackPlanMessage {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = Logger.getLogger(CompleteFailbackRequestMessage.class.getName());
+    private final Set<Integer> partitions;
+    private final String nodeId;
+
+    public CompleteFailbackRequestMessage(long planId, int requestId, String nodeId, Set<Integer> partitions) {
+        super(planId, requestId);
+        this.nodeId = nodeId;
+        this.partitions = partitions;
+    }
+
+    public Set<Integer> getPartitions() {
+        return partitions;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Plan ID: " + planId);
+        sb.append(" Node ID: " + nodeId);
+        sb.append(" Partitions: " + partitions);
+        return sb.toString();
+    }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException {
+        NodeControllerService ncs = (NodeControllerService) cs;
+        IAsterixAppRuntimeContext appContext =
+                (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+        INCMessageBroker broker = (INCMessageBroker) ncs.getApplicationContext().getMessageBroker();
+        HyracksDataException hde = null;
+        try {
+            IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager();
+            remoteRecoeryManager.completeFailbackProcess();
+        } catch (IOException | InterruptedException e) {
+            LOGGER.log(Level.SEVERE, "Failure during completion of failback process", e);
+            hde = ExceptionUtils.convertToHyracksDataException(e);
+        } finally {
+            CompleteFailbackResponseMessage reponse = new CompleteFailbackResponseMessage(planId,
+                    requestId, partitions);
+            try {
+                broker.sendMessageToCC(reponse, null);
+            } catch (Exception e) {
+                LOGGER.log(Level.SEVERE, "Failure sending message to CC", e);
+                hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
+            }
+        }
+        if (hde != null) {
+            throw hde;
+        }
+    }
+
+    @Override
+    public String type() {
+        return "COMPLETE_FAILBACK_REQUEST";
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
new file mode 100644
index 0000000..3c4b58c
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import java.util.Set;
+
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+
+public class CompleteFailbackResponseMessage extends AbstractFailbackPlanMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final Set<Integer> partitions;
+
+    public CompleteFailbackResponseMessage(long planId, int requestId, Set<Integer> partitions) {
+        super(planId, requestId);
+        this.partitions = partitions;
+    }
+
+    public Set<Integer> getPartitions() {
+        return partitions;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Plan ID: " + planId);
+        sb.append(" Partitions: " + partitions);
+        return sb.toString();
+    }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException {
+        AsterixClusterProperties.INSTANCE.processCompleteFailbackResponse(this);
+    }
+
+    @Override
+    public String type() {
+        return "COMPLETE_FAILBACK_RESPONSE";
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/NodeFailbackPlan.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/NodeFailbackPlan.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/NodeFailbackPlan.java
new file mode 100644
index 0000000..24f5fe8
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/NodeFailbackPlan.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class NodeFailbackPlan {
+
+    public enum FailbackPlanState {
+        /**
+         * Initial state while selecting the nodes that will participate
+         * in the node failback plan.
+         */
+        PREPARING,
+        /**
+         * Once a pending {@link PreparePartitionsFailbackRequestMessage} request is added,
+         * the state is changed from PREPARING to PENDING_PARTICIPANT_REPONSE to indicate
+         * a response is expected and need to wait for it.
+         */
+        PENDING_PARTICIPANT_REPONSE,
+        /**
+         * Upon receiving the last {@link PreparePartitionsFailbackResponseMessage} response,
+         * the state changes from PENDING_PARTICIPANT_REPONSE to PENDING_COMPLETION to indicate
+         * the need to send {@link CompleteFailbackRequestMessage} to the failing back node.
+         */
+        PENDING_COMPLETION,
+        /**
+         * if any of the participants fail or the failing back node itself fails during
+         * and of these states (PREPARING, PENDING_PARTICIPANT_REPONSE, PENDING_COMPLETION),
+         * the state is changed to FAILED.
+         */
+        FAILED,
+        /**
+         * if the state is FAILED, and all pending responses (if any) have been received,
+         * the state changes from FAILED to PENDING_ROLLBACK to indicate the need to revert
+         * the effects of this plan (if any).
+         */
+        PENDING_ROLLBACK
+    }
+
+    private static long planIdGenerator = 0;
+    private long planId;
+    private final String nodeId;
+    private final Set<String> participants;
+    private final Map<Integer, String> partition2nodeMap;
+    private String nodeToReleaseMetadataManager;
+    private int requestId;
+    private Map<Integer, PreparePartitionsFailbackRequestMessage> pendingRequests;
+    private FailbackPlanState state;
+
+    public static NodeFailbackPlan createPlan(String nodeId) {
+        return new NodeFailbackPlan(planIdGenerator++, nodeId);
+    }
+
+    private NodeFailbackPlan(long planId, String nodeId) {
+        this.planId = planId;
+        this.nodeId = nodeId;
+        participants = new HashSet<>();
+        partition2nodeMap = new HashMap<>();
+        pendingRequests = new HashMap<>();
+        state = FailbackPlanState.PREPARING;
+    }
+
+    public synchronized void addPartitionToFailback(int partitionId, String currentActiveNode) {
+        partition2nodeMap.put(partitionId, currentActiveNode);
+    }
+
+    public synchronized void addParticipant(String nodeId) {
+        participants.add(nodeId);
+    }
+
+    public synchronized void notifyNodeFailure(String failedNode) {
+        if (participants.contains(failedNode)) {
+            if (state == FailbackPlanState.PREPARING) {
+                state = FailbackPlanState.FAILED;
+            } else if (state == FailbackPlanState.PENDING_PARTICIPANT_REPONSE) {
+                /**
+                 * if there is any pending request from this failed node,
+                 * it should be marked as completed and the plan should be marked as failed
+                 */
+                Set<Integer> failedRequests = new HashSet<>();
+                for (PreparePartitionsFailbackRequestMessage request : pendingRequests.values()) {
+                    if (request.getNodeID().equals(failedNode)) {
+                        failedRequests.add(request.getRequestId());
+                    }
+                }
+
+                if (!failedRequests.isEmpty()) {
+                    state = FailbackPlanState.FAILED;
+                    for (Integer failedRequestId : failedRequests) {
+                        markRequestCompleted(failedRequestId);
+                    }
+                }
+            }
+        } else if (nodeId.equals(failedNode)) {
+            //if the failing back node is the failed node itself
+            state = FailbackPlanState.FAILED;
+            updateState();
+        }
+    }
+
+    public synchronized Set<Integer> getPartitionsToFailback() {
+        return new HashSet<>(partition2nodeMap.keySet());
+    }
+
+    public synchronized void addPendingRequest(PreparePartitionsFailbackRequestMessage msg) {
+        //if this is the first request
+        if (pendingRequests.size() == 0) {
+            state = FailbackPlanState.PENDING_PARTICIPANT_REPONSE;
+        }
+        pendingRequests.put(msg.getRequestId(), msg);
+    }
+
+    public synchronized void markRequestCompleted(int requestId) {
+        pendingRequests.remove(requestId);
+        updateState();
+    }
+
+    private void updateState() {
+        if (pendingRequests.size() == 0) {
+            switch (state) {
+                case PREPARING:
+                case FAILED:
+                    state = FailbackPlanState.PENDING_ROLLBACK;
+                    break;
+                case PENDING_PARTICIPANT_REPONSE:
+                    state = FailbackPlanState.PENDING_COMPLETION;
+                    break;
+                default:
+                    break;
+            }
+        }
+    }
+
+    public synchronized Set<PreparePartitionsFailbackRequestMessage> getPlanFailbackRequests() {
+        Set<PreparePartitionsFailbackRequestMessage> node2Partitions = new HashSet<>();
+        /**
+         * for each participant, construct a request with the partitions
+         * that will be failed back or flushed.
+         */
+        for (String participant : participants) {
+            Set<Integer> partitionToPrepareForFailback = new HashSet<>();
+            for (Map.Entry<Integer, String> entry : partition2nodeMap.entrySet()) {
+                if (entry.getValue().equals(participant)) {
+                    partitionToPrepareForFailback.add(entry.getKey());
+                }
+            }
+            PreparePartitionsFailbackRequestMessage msg = new PreparePartitionsFailbackRequestMessage(planId,
+                    requestId++, participant, partitionToPrepareForFailback);
+            if (participant.equals(nodeToReleaseMetadataManager)) {
+                msg.setReleaseMetadataNode(true);
+            }
+            node2Partitions.add(msg);
+        }
+        return node2Partitions;
+    }
+
+    public synchronized CompleteFailbackRequestMessage getCompleteFailbackRequestMessage() {
+        return new CompleteFailbackRequestMessage(planId, requestId++, nodeId, getPartitionsToFailback());
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public long getPlanId() {
+        return planId;
+    }
+
+    public void setNodeToReleaseMetadataManager(String nodeToReleaseMetadataManager) {
+        this.nodeToReleaseMetadataManager = nodeToReleaseMetadataManager;
+    }
+
+    public synchronized FailbackPlanState getState() {
+        return state;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Plan ID: " + planId);
+        sb.append(" Failing back node: " + nodeId);
+        sb.append(" Participants: " + participants);
+        sb.append(" Partitions to Failback: " + partition2nodeMap.keySet());
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java
new file mode 100644
index 0000000..e3b9fbe
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import java.rmi.RemoteException;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class PreparePartitionsFailbackRequestMessage extends AbstractFailbackPlanMessage {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = Logger.getLogger(PreparePartitionsFailbackRequestMessage.class.getName());
+    private final Set<Integer> partitions;
+    private boolean releaseMetadataNode = false;
+    private final String nodeID;
+
+    public PreparePartitionsFailbackRequestMessage(long planId, int requestId, String nodeId, Set<Integer> partitions) {
+        super(planId, requestId);
+        this.nodeID = nodeId;
+        this.partitions = partitions;
+    }
+
+    public Set<Integer> getPartitions() {
+        return partitions;
+    }
+
+    public boolean isReleaseMetadataNode() {
+        return releaseMetadataNode;
+    }
+
+    public void setReleaseMetadataNode(boolean releaseMetadataNode) {
+        this.releaseMetadataNode = releaseMetadataNode;
+    }
+
+    public String getNodeID() {
+        return nodeID;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Plan ID: " + planId);
+        sb.append(" Partitions: " + partitions);
+        sb.append(" releaseMetadataNode: " + releaseMetadataNode);
+        return sb.toString();
+    }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException {
+        NodeControllerService ncs = (NodeControllerService) cs;
+        IAsterixAppRuntimeContext appContext =
+                (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+        INCMessageBroker broker = (INCMessageBroker) ncs.getApplicationContext().getMessageBroker();
+        /**
+         * if the metadata partition will be failed back
+         * we need to flush and close all datasets including metadata datasets
+         * otherwise we need to close all non-metadata datasets and flush metadata datasets
+         * so that their memory components will be copied to the failing back node
+         */
+        if (releaseMetadataNode) {
+            appContext.getDatasetLifecycleManager().closeAllDatasets();
+            //remove the metadata node stub from RMI registry
+            try {
+                appContext.unexportMetadataNodeStub();
+            } catch (RemoteException e) {
+                LOGGER.log(Level.SEVERE, "Failed unexporting metadata stub", e);
+                throw ExceptionUtils.convertToHyracksDataException(e);
+            }
+        } else {
+            //close all non-metadata datasets
+            appContext.getDatasetLifecycleManager().closeUserDatasets();
+            //flush the remaining metadata datasets that were not closed
+            appContext.getDatasetLifecycleManager().flushAllDatasets();
+        }
+
+        //mark the partitions to be closed as inactive
+        PersistentLocalResourceRepository localResourceRepo = (PersistentLocalResourceRepository) appContext
+                .getLocalResourceRepository();
+        for (Integer partitionId : partitions) {
+            localResourceRepo.addInactivePartition(partitionId);
+        }
+
+        //send response after partitions prepared for failback
+        PreparePartitionsFailbackResponseMessage reponse = new PreparePartitionsFailbackResponseMessage(planId,
+                requestId, partitions);
+        try {
+            broker.sendMessageToCC(reponse, null);
+        } catch (Exception e) {
+            LOGGER.log(Level.SEVERE, "Failed sending message to cc", e);
+            throw ExceptionUtils.convertToHyracksDataException(e);
+        }
+    }
+
+    @Override
+    public String type() {
+        return "PREPARE_PARTITIONS_FAILBACK_REQUEST";
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
new file mode 100644
index 0000000..2e52773
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import java.util.Set;
+
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+
+public class PreparePartitionsFailbackResponseMessage extends AbstractFailbackPlanMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final Set<Integer> partitions;
+
+    public PreparePartitionsFailbackResponseMessage(long planId, int requestId, Set<Integer> partitions) {
+        super(planId, requestId);
+        this.partitions = partitions;
+    }
+
+    public Set<Integer> getPartitions() {
+        return partitions;
+    }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException {
+        AsterixClusterProperties.INSTANCE.processPreparePartitionsFailbackResponse(this);
+    }
+
+    @Override
+    public String type() {
+        return "PREPARE_PARTITIONS_FAILBACK_RESPONSE";
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java
new file mode 100644
index 0000000..2aa4746
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.replication.Replica;
+import org.apache.asterix.common.replication.ReplicaEvent;
+import org.apache.asterix.event.schema.cluster.Node;
+import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class ReplicaEventMessage extends AbstractApplicationMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final String nodeId;
+    private final ClusterEventType event;
+    private final String nodeIPAddress;
+
+    public ReplicaEventMessage(String nodeId, String nodeIPAddress, ClusterEventType event) {
+        this.nodeId = nodeId;
+        this.nodeIPAddress = nodeIPAddress;
+        this.event = event;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public ClusterEventType getEvent() {
+        return event;
+    }
+
+    public String getNodeIPAddress() {
+        return nodeIPAddress;
+    }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException {
+        NodeControllerService ncs = (NodeControllerService) cs;
+        IAsterixAppRuntimeContext appContext =
+                (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+        Node node = new Node();
+        node.setId(nodeId);
+        node.setClusterIp(nodeIPAddress);
+        Replica replica = new Replica(node);
+        appContext.getReplicationManager().reportReplicaEvent(new ReplicaEvent(replica, event));
+    }
+
+    @Override
+    public String type() {
+        return "REPLICA_EVENT";
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
new file mode 100644
index 0000000..1604e29
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
+import org.apache.asterix.common.transactions.IAsterixResourceIdManager;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class ReportMaxResourceIdMessage extends AbstractApplicationMessage {
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = Logger.getLogger(ReportMaxResourceIdMessage.class.getName());
+    private final long maxResourceId;
+    private final String src;
+
+    public ReportMaxResourceIdMessage(String src, long maxResourceId) {
+        this.src = src;
+        this.maxResourceId = maxResourceId;
+    }
+
+    public long getMaxResourceId() {
+        return maxResourceId;
+    }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException {
+        IAsterixResourceIdManager resourceIdManager =
+                AsterixAppContextInfo.INSTANCE.getResourceIdManager();
+        resourceIdManager.report(src, maxResourceId);
+    }
+
+    public static void send(NodeControllerService cs) throws HyracksDataException {
+        NodeControllerService ncs = cs;
+        IAsterixAppRuntimeContext appContext =
+                (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+        long maxResourceId = Math.max(appContext.getLocalResourceRepository().getMaxResourceID(),
+                MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
+        ReportMaxResourceIdMessage maxResourceIdMsg = new ReportMaxResourceIdMessage(ncs.getId(), maxResourceId);
+        try {
+            ((INCMessageBroker) ncs.getApplicationContext().getMessageBroker()).sendMessageToCC(maxResourceIdMsg, null);
+        } catch (Exception e) {
+            LOGGER.log(Level.SEVERE, "Unable to report max local resource id", e);
+            throw ExceptionUtils.convertToHyracksDataException(e);
+        }
+    }
+
+    @Override
+    public String type() {
+        return "REPORT_MAX_RESOURCE_ID_RESPONSE";
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java
new file mode 100644
index 0000000..3e2becf
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class ReportMaxResourceIdRequestMessage extends AbstractApplicationMessage {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException {
+        ReportMaxResourceIdMessage.send((NodeControllerService) cs);
+    }
+
+    @Override
+    public String type() {
+        return "REPORT_MAX_RESOURCE_ID_REQUEST";
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
new file mode 100644
index 0000000..5e7d808
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import java.util.Set;
+
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.transactions.IAsterixResourceIdManager;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+
+public class ResourceIdRequestMessage extends AbstractApplicationMessage {
+    private static final long serialVersionUID = 1L;
+    private final String src;
+
+    public ResourceIdRequestMessage(String src) {
+        this.src = src;
+    }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException {
+        try {
+            ICCMessageBroker broker =
+                    (ICCMessageBroker) AsterixAppContextInfo.INSTANCE.getCCApplicationContext().getMessageBroker();
+            ResourceIdRequestResponseMessage reponse = new ResourceIdRequestResponseMessage();
+            reponse.setId(id);
+            if (!AsterixClusterProperties.INSTANCE.isClusterActive()) {
+                reponse.setResourceId(-1);
+                reponse.setException(new Exception("Cannot generate global resource id when cluster is not active."));
+            } else {
+                IAsterixResourceIdManager resourceIdManager =
+                        AsterixAppContextInfo.INSTANCE.getResourceIdManager();
+                reponse.setResourceId(resourceIdManager.createResourceId());
+                if (reponse.getResourceId() < 0) {
+                    reponse.setException(new Exception("One or more nodes has not reported max resource id."));
+                }
+                requestMaxResourceID(resourceIdManager, broker);
+            }
+            broker.sendApplicationMessageToNC(reponse, src);
+        } catch (Exception e) {
+            throw ExceptionUtils.convertToHyracksDataException(e);
+        }
+    }
+
+    private void requestMaxResourceID(IAsterixResourceIdManager resourceIdManager, ICCMessageBroker broker)
+            throws Exception {
+        Set<String> getParticipantNodes = AsterixClusterProperties.INSTANCE.getParticipantNodes();
+        ReportMaxResourceIdRequestMessage msg = new ReportMaxResourceIdRequestMessage();
+        msg.setId(ICCMessageBroker.NO_CALLBACK_MESSAGE_ID);
+        for (String nodeId : getParticipantNodes) {
+            if (!resourceIdManager.reported(nodeId)) {
+                broker.sendApplicationMessageToNC(msg, nodeId);
+            }
+        }
+    }
+
+    @Override
+    public String type() {
+        return "RESOURCE_ID_REQUEST";
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
new file mode 100644
index 0000000..62c2163
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+
+public class ResourceIdRequestResponseMessage extends AbstractApplicationMessage {
+    private static final long serialVersionUID = 1L;
+
+    private long resourceId;
+    private Exception exception;
+
+    public long getResourceId() {
+        return resourceId;
+    }
+
+    public void setResourceId(long resourceId) {
+        this.resourceId = resourceId;
+    }
+
+    public Exception getException() {
+        return exception;
+    }
+
+    public void setException(Exception exception) {
+        this.exception = exception;
+    }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException {
+        // Do nothing. for this message, the callback handles it, we probably should get rid of callbacks and
+        // instead, use the handle in the response to perform callback action
+    }
+
+    @Override
+    public String type() {
+        return "RESOURCE_ID_RESPONSE";
+    }
+}


Mime
View raw message