asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [5/6] asterixdb git commit: Refactor Messaging
Date Sat, 03 Sep 2016 12:44:50 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
index 529d38a..0e3bb3b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
@@ -30,7 +30,7 @@ import org.apache.asterix.event.schema.cluster.Node;
 import org.apache.asterix.metadata.cluster.AddNodeWork;
 import org.apache.asterix.metadata.cluster.ClusterManager;
 import org.apache.asterix.metadata.cluster.RemoveNodeWork;
-import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
 
 public class ClusterWorkExecutor implements Runnable {
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
index bbd400b..7dde284 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -41,7 +41,7 @@ import org.apache.asterix.metadata.entities.Dataverse;
 import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
 import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.metadata.utils.MetadataConstants;
-import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
 import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index a8bc48f..c11b875 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -37,6 +37,7 @@ import org.apache.asterix.common.config.IAsterixPropertiesProvider;
 import org.apache.asterix.common.config.MessagingProperties;
 import org.apache.asterix.common.replication.IRemoteRecoveryManager;
 import org.apache.asterix.common.transactions.IRecoveryManager;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.event.schema.cluster.Cluster;
@@ -44,8 +45,8 @@ import org.apache.asterix.event.schema.cluster.Node;
 import org.apache.asterix.messaging.MessagingChannelInterfaceFactory;
 import org.apache.asterix.messaging.NCMessageBroker;
 import org.apache.asterix.metadata.bootstrap.MetadataBootstrap;
-import org.apache.asterix.om.util.AsterixClusterProperties;
-import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.asterix.runtime.message.ReportMaxResourceIdMessage;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
 import org.apache.asterix.transaction.management.service.recovery.RecoveryManager;
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.application.INCApplicationContext;
@@ -210,8 +211,7 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
     @Override
     public void notifyStartupComplete() throws Exception {
         //Send max resource id on this NC to the CC
-        ((NCMessageBroker) ncApplicationContext.getMessageBroker()).reportMaxResourceId();
-
+        ReportMaxResourceIdMessage.send((NodeControllerService) ncApplicationContext.getControllerService());
         AsterixMetadataProperties metadataProperties = ((IAsterixPropertiesProvider) runtimeContext)
                 .getMetadataProperties();
         if (initialRun || systemState == SystemState.NEW_UNIVERSE) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
index 27a5365..1345aa7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -18,27 +18,13 @@
  */
 package org.apache.asterix.messaging;
 
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.active.message.ActivePartitionMessage;
-import org.apache.asterix.app.external.ActiveLifecycleListener;
 import org.apache.asterix.common.messaging.AbstractApplicationMessage;
-import org.apache.asterix.common.messaging.CompleteFailbackResponseMessage;
-import org.apache.asterix.common.messaging.PreparePartitionsFailbackResponseMessage;
-import org.apache.asterix.common.messaging.ReportMaxResourceIdMessage;
-import org.apache.asterix.common.messaging.ReportMaxResourceIdRequestMessage;
-import org.apache.asterix.common.messaging.ResourceIdRequestMessage;
-import org.apache.asterix.common.messaging.ResourceIdRequestResponseMessage;
-import org.apache.asterix.common.messaging.TakeoverMetadataNodeResponseMessage;
-import org.apache.asterix.common.messaging.TakeoverPartitionsResponseMessage;
 import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.asterix.common.messaging.api.ICCMessageBroker;
-import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.hyracks.api.messages.IMessage;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
 import org.apache.hyracks.control.cc.ClusterControllerService;
@@ -47,10 +33,7 @@ import org.apache.hyracks.control.cc.NodeControllerState;
 public class CCMessageBroker implements ICCMessageBroker {
 
     private final static Logger LOGGER = Logger.getLogger(CCMessageBroker.class.getName());
-    private final AtomicLong globalResourceId = new AtomicLong(0);
     private final ClusterControllerService ccs;
-    private final Set<String> nodesReportedMaxResourceId = new HashSet<>();
-    public static final long NO_CALLBACK_MESSAGE_ID = -1;
 
     public CCMessageBroker(ClusterControllerService ccs) {
         this.ccs = ccs;
@@ -60,63 +43,9 @@ public class CCMessageBroker implements ICCMessageBroker {
     public void receivedMessage(IMessage message, String nodeId) throws Exception {
         AbstractApplicationMessage absMessage = (AbstractApplicationMessage) message;
         if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Received message: " + absMessage.getMessageType().name());
+            LOGGER.info("Received message: " + absMessage.type());
         }
-        switch (absMessage.getMessageType()) {
-            case RESOURCE_ID_REQUEST:
-                handleResourceIdRequest(message, nodeId);
-                break;
-            case REPORT_MAX_RESOURCE_ID_RESPONSE:
-                handleReportResourceMaxIdResponse(message, nodeId);
-                break;
-            case TAKEOVER_PARTITIONS_RESPONSE:
-                handleTakeoverPartitionsResponse(message);
-                break;
-            case TAKEOVER_METADATA_NODE_RESPONSE:
-                handleTakeoverMetadataNodeResponse(message);
-                break;
-            case PREPARE_PARTITIONS_FAILBACK_RESPONSE:
-                handleClosePartitionsResponse(message);
-                break;
-            case COMPLETE_FAILBACK_RESPONSE:
-                handleCompleteFailbcakResponse(message);
-                break;
-            case ACTIVE_ENTITY_TO_CC_MESSAGE:
-                handleActiveEntityMessage(message);
-                break;
-            default:
-                LOGGER.warning("Unknown message: " + absMessage.getMessageType());
-                break;
-        }
-    }
-
-    private void handleActiveEntityMessage(IMessage message) {
-        ActiveLifecycleListener.INSTANCE.receive((ActivePartitionMessage) message);
-    }
-
-    private synchronized void handleResourceIdRequest(IMessage message, String nodeId) throws Exception {
-        ResourceIdRequestMessage msg = (ResourceIdRequestMessage) message;
-        ResourceIdRequestResponseMessage reponse = new ResourceIdRequestResponseMessage();
-        reponse.setId(msg.getId());
-        //cluster is not active
-        if (!AsterixClusterProperties.INSTANCE.isClusterActive()) {
-            reponse.setResourceId(-1);
-            reponse.setException(new Exception("Cannot generate global resource id when cluster is not active."));
-        } else if (nodesReportedMaxResourceId.size() < AsterixClusterProperties.getNumberOfNodes()) {
-            //some node has not reported max resource id
-            reponse.setResourceId(-1);
-            reponse.setException(new Exception("One or more nodes has not reported max resource id."));
-            requestMaxResourceID();
-        } else {
-            reponse.setResourceId(globalResourceId.incrementAndGet());
-        }
-        sendApplicationMessageToNC(reponse, nodeId);
-    }
-
-    private synchronized void handleReportResourceMaxIdResponse(IMessage message, String nodeId) throws Exception {
-        ReportMaxResourceIdMessage msg = (ReportMaxResourceIdMessage) message;
-        globalResourceId.set(Math.max(msg.getMaxResourceId(), globalResourceId.get()));
-        nodesReportedMaxResourceId.add(nodeId);
+        absMessage.handle(ccs);
     }
 
     @Override
@@ -125,36 +54,4 @@ public class CCMessageBroker implements ICCMessageBroker {
         NodeControllerState state = nodeMap.get(nodeId);
         state.getNodeController().sendApplicationMessageToNC(JavaSerializationUtils.serialize(msg), null, nodeId);
     }
-
-    private void requestMaxResourceID() throws Exception {
-        //send request to NCs that have not reported their max resource ids
-        Set<String> getParticipantNodes = AsterixClusterProperties.INSTANCE.getParticipantNodes();
-        ReportMaxResourceIdRequestMessage msg = new ReportMaxResourceIdRequestMessage();
-        msg.setId(NO_CALLBACK_MESSAGE_ID);
-        for (String nodeId : getParticipantNodes) {
-            if (!nodesReportedMaxResourceId.contains(nodeId)) {
-                sendApplicationMessageToNC(msg, nodeId);
-            }
-        }
-    }
-
-    private void handleTakeoverPartitionsResponse(IMessage message) {
-        TakeoverPartitionsResponseMessage msg = (TakeoverPartitionsResponseMessage) message;
-        AsterixClusterProperties.INSTANCE.processPartitionTakeoverResponse(msg);
-    }
-
-    private void handleTakeoverMetadataNodeResponse(IMessage message) {
-        TakeoverMetadataNodeResponseMessage msg = (TakeoverMetadataNodeResponseMessage) message;
-        AsterixClusterProperties.INSTANCE.processMetadataNodeTakeoverResponse(msg);
-    }
-
-    private void handleCompleteFailbcakResponse(IMessage message) {
-        CompleteFailbackResponseMessage msg = (CompleteFailbackResponseMessage) message;
-        AsterixClusterProperties.INSTANCE.processCompleteFailbackResponse(msg);
-    }
-
-    private void handleClosePartitionsResponse(IMessage message) {
-        PreparePartitionsFailbackResponseMessage msg = (PreparePartitionsFailbackResponseMessage) message;
-        AsterixClusterProperties.INSTANCE.processPreparePartitionsFailbackResponse(msg);
-    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index c7e4ac8..9851b61 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -27,30 +27,13 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.active.ActiveManager;
-import org.apache.asterix.active.message.ActiveManagerMessage;
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.config.MessagingProperties;
 import org.apache.asterix.common.memory.ConcurrentFramePool;
 import org.apache.asterix.common.messaging.AbstractApplicationMessage;
-import org.apache.asterix.common.messaging.CompleteFailbackRequestMessage;
-import org.apache.asterix.common.messaging.CompleteFailbackResponseMessage;
-import org.apache.asterix.common.messaging.PreparePartitionsFailbackRequestMessage;
-import org.apache.asterix.common.messaging.PreparePartitionsFailbackResponseMessage;
-import org.apache.asterix.common.messaging.ReplicaEventMessage;
-import org.apache.asterix.common.messaging.ReportMaxResourceIdMessage;
-import org.apache.asterix.common.messaging.TakeoverMetadataNodeResponseMessage;
-import org.apache.asterix.common.messaging.TakeoverPartitionsRequestMessage;
-import org.apache.asterix.common.messaging.TakeoverPartitionsResponseMessage;
 import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.asterix.common.messaging.api.IApplicationMessageCallback;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
-import org.apache.asterix.common.replication.IRemoteRecoveryManager;
-import org.apache.asterix.common.replication.Replica;
-import org.apache.asterix.common.replication.ReplicaEvent;
-import org.apache.asterix.event.schema.cluster.Node;
-import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties;
-import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.comm.IChannelControlBlock;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.messages.IMessage;
@@ -112,49 +95,16 @@ public class NCMessageBroker implements INCMessageBroker {
 
     @Override
     public void receivedMessage(IMessage message, String nodeId) throws Exception {
-        try {
-            AbstractApplicationMessage absMessage = (AbstractApplicationMessage) message;
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Received message: " + absMessage.getMessageType().name());
-            }
-            //if the received message is a response to a sent message, deliver it to the sender
-            IApplicationMessageCallback callback = callbacks.remove(absMessage.getId());
-            if (callback != null) {
-                callback.deliverMessageResponse(absMessage);
-            }
-
-            //handle requests from CC
-            switch (absMessage.getMessageType()) {
-                case REPORT_MAX_RESOURCE_ID_REQUEST:
-                    reportMaxResourceId();
-                    break;
-                case TAKEOVER_PARTITIONS_REQUEST:
-                    handleTakeoverPartitons(message);
-                    break;
-                case TAKEOVER_METADATA_NODE_REQUEST:
-                    handleTakeoverMetadataNode(message);
-                    break;
-                case PREPARE_PARTITIONS_FAILBACK_REQUEST:
-                    handlePreparePartitionsFailback(message);
-                    break;
-                case COMPLETE_FAILBACK_REQUEST:
-                    handleCompleteFailbackRequest(message);
-                    break;
-                case REPLICA_EVENT:
-                    handleReplicaEvent(message);
-                    break;
-                case ACTIVE_MANAGER_MESSAGE:
-                    ((ActiveManager) appContext.getActiveManager()).submit((ActiveManagerMessage) message);
-                    break;
-                default:
-                    break;
-            }
-        } catch (Exception e) {
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.log(Level.WARNING, e.getMessage(), e);
-            }
-            throw e;
+        AbstractApplicationMessage absMessage = (AbstractApplicationMessage) message;
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Received message: " + absMessage.type());
         }
+        //if the received message is a response to a sent message, deliver it to the sender
+        IApplicationMessageCallback callback = callbacks.remove(absMessage.getId());
+        if (callback != null) {
+            callback.deliverMessageResponse(absMessage);
+        }
+        absMessage.handle(ncs);
     }
 
     public ConcurrentFramePool getMessagingFramePool() {
@@ -190,96 +140,6 @@ public class NCMessageBroker implements INCMessageBroker {
         ccb.getWriteInterface().getFullBufferAcceptor().accept(msgBuffer);
     }
 
-    private void handleTakeoverPartitons(IMessage message) throws Exception {
-        TakeoverPartitionsRequestMessage msg = (TakeoverPartitionsRequestMessage) message;
-        //if the NC is shutting down, it should ignore takeover partitions request
-        if (!appContext.isShuttingdown()) {
-            try {
-                IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager();
-                remoteRecoeryManager.takeoverPartitons(msg.getPartitions());
-            } finally {
-                //send response after takeover is completed
-                TakeoverPartitionsResponseMessage reponse = new TakeoverPartitionsResponseMessage(msg.getRequestId(),
-                        appContext.getTransactionSubsystem().getId(), msg.getPartitions());
-                sendMessageToCC(reponse, null);
-            }
-        }
-    }
-
-    private void handleTakeoverMetadataNode(IMessage message) throws Exception {
-        try {
-            appContext.initializeMetadata(false);
-            appContext.exportMetadataNodeStub();
-        } finally {
-            TakeoverMetadataNodeResponseMessage reponse = new TakeoverMetadataNodeResponseMessage(
-                    appContext.getTransactionSubsystem().getId());
-            sendMessageToCC(reponse, null);
-        }
-    }
-
-    public void reportMaxResourceId() throws Exception {
-        ReportMaxResourceIdMessage maxResourceIdMsg = new ReportMaxResourceIdMessage();
-        //resource ids < FIRST_AVAILABLE_USER_DATASET_ID are reserved for metadata indexes.
-        long maxResourceId = Math.max(appContext.getLocalResourceRepository().getMaxResourceID(),
-                MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
-        maxResourceIdMsg.setMaxResourceId(maxResourceId);
-        sendMessageToCC(maxResourceIdMsg, null);
-    }
-
-    private void handleReplicaEvent(IMessage message) {
-        ReplicaEventMessage msg = (ReplicaEventMessage) message;
-        Node node = new Node();
-        node.setId(msg.getNodeId());
-        node.setClusterIp(msg.getNodeIPAddress());
-        Replica replica = new Replica(node);
-        ReplicaEvent event = new ReplicaEvent(replica, msg.getEvent());
-        appContext.getReplicationManager().reportReplicaEvent(event);
-    }
-
-    private void handlePreparePartitionsFailback(IMessage message) throws Exception {
-        PreparePartitionsFailbackRequestMessage msg = (PreparePartitionsFailbackRequestMessage) message;
-        /**
-         * if the metadata partition will be failed back
-         * we need to flush and close all datasets including metadata datasets
-         * otherwise we need to close all non-metadata datasets and flush metadata datasets
-         * so that their memory components will be copied to the failing back node
-         */
-        if (msg.isReleaseMetadataNode()) {
-            appContext.getDatasetLifecycleManager().closeAllDatasets();
-            //remove the metadata node stub from RMI registry
-            appContext.unexportMetadataNodeStub();
-        } else {
-            //close all non-metadata datasets
-            appContext.getDatasetLifecycleManager().closeUserDatasets();
-            //flush the remaining metadata datasets that were not closed
-            appContext.getDatasetLifecycleManager().flushAllDatasets();
-        }
-
-        //mark the partitions to be closed as inactive
-        PersistentLocalResourceRepository localResourceRepo = (PersistentLocalResourceRepository) appContext
-                .getLocalResourceRepository();
-        for (Integer partitionId : msg.getPartitions()) {
-            localResourceRepo.addInactivePartition(partitionId);
-        }
-
-        //send response after partitions prepared for failback
-        PreparePartitionsFailbackResponseMessage reponse = new PreparePartitionsFailbackResponseMessage(msg.getPlanId(),
-                msg.getRequestId(), msg.getPartitions());
-        sendMessageToCC(reponse, null);
-    }
-
-    private void handleCompleteFailbackRequest(IMessage message) throws Exception {
-        CompleteFailbackRequestMessage msg = (CompleteFailbackRequestMessage) message;
-        try {
-            IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager();
-            remoteRecoeryManager.completeFailbackProcess();
-        } finally {
-            CompleteFailbackResponseMessage reponse = new CompleteFailbackResponseMessage(msg.getPlanId(),
-                    msg.getRequestId(), msg.getPartitions());
-            sendMessageToCC(reponse, null);
-        }
-    }
-
     private class MessageDeliveryService implements Runnable {
         /*
          * TODO Currently this thread is not stopped when it is interrupted because
@@ -302,7 +162,7 @@ public class NCMessageBroker implements INCMessageBroker {
                 } catch (Exception e) {
                     if (LOGGER.isLoggable(Level.WARNING) && msg != null) {
                         LOGGER.log(Level.WARNING, "Could not process message with id: " + msg.getId() + " and type: "
-                                + msg.getMessageType().name(), e);
+                                + msg.type(), e);
                     } else {
                         if (LOGGER.isLoggable(Level.WARNING)) {
                             LOGGER.log(Level.WARNING, "Could not process message", e);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FlushDatasetUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FlushDatasetUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FlushDatasetUtils.java
index 7536c70..5f99dbf 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FlushDatasetUtils.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FlushDatasetUtils.java
@@ -23,9 +23,9 @@ import org.apache.asterix.common.config.AsterixCompilerProperties;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
 import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
 import org.apache.asterix.runtime.operators.std.FlushDatasetOperatorDescriptor;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
 import org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
@@ -44,8 +44,8 @@ public class FlushDatasetUtils {
 
     public static void flushDataset(IHyracksClientConnection hcc, AqlMetadataProvider metadataProvider,
             MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName, String indexName)
-                    throws Exception {
-        AsterixCompilerProperties compilerProperties = AsterixAppContextInfo.getInstance().getCompilerProperties();
+            throws Exception {
+        AsterixCompilerProperties compilerProperties = AsterixAppContextInfo.INSTANCE.getCompilerProperties();
         int frameSize = compilerProperties.getFrameSize();
         JobSpecification spec = new JobSpecification(frameSize);
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionAPIServletTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionAPIServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionAPIServletTest.java
index 015088a..1c190c9 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionAPIServletTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionAPIServletTest.java
@@ -32,7 +32,7 @@ import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
 import org.apache.asterix.common.config.AsterixBuildProperties;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
 import org.apache.asterix.test.runtime.ExecutionTest;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.json.JSONObject;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 7e35f11..a548b3a 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -36,6 +36,7 @@ import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
 import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
+import org.apache.asterix.runtime.util.AsterixRuntimeComponentsProvider;
 import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
 import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
@@ -51,7 +52,6 @@ import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperati
 import org.apache.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
 import org.apache.asterix.transaction.management.service.logging.LogReader;
-import org.apache.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
 import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
index 3591509..4e8875b 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
@@ -36,8 +36,8 @@ import org.apache.asterix.event.schema.cluster.Cluster;
 import org.apache.asterix.event.schema.cluster.MasterNode;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.statement.RunStatement;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.junit.Assert;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
index c73be7a..39a4d3b 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
@@ -31,7 +31,7 @@ import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.IdentitiyResolverFactory;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
 import org.apache.asterix.testframework.xml.TestGroup;
 import org.apache.asterix.testframework.xml.TestSuite;
 import org.apache.hyracks.control.nc.NodeControllerService;
@@ -84,7 +84,7 @@ public class ExecutionTestUtil {
 
         List<ILibraryManager> libraryManagers = new ArrayList<>();
         // Adds the library manager for CC.
-        libraryManagers.add(AsterixAppContextInfo.getInstance().getLibraryManager());
+        libraryManagers.add(AsterixAppContextInfo.INSTANCE.getLibraryManager());
         // Adds library managers for NCs, one-per-NC.
         for (NodeControllerService nc : integrationUtil.ncs) {
             IAsterixAppRuntimeContext runtimeCtx =

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-common/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/pom.xml b/asterixdb/asterix-common/pom.xml
index bfdc834..c01499e 100644
--- a/asterixdb/asterix-common/pom.xml
+++ b/asterixdb/asterix-common/pom.xml
@@ -263,6 +263,11 @@
       <version>2.0.2-beta</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>18.0</version>
+    </dependency>
   </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
new file mode 100644
index 0000000..b9d187d
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.exceptions;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class ExceptionUtils {
+    public static final String INCORRECT_PARAMETER = "Incorrect parameter.\n";
+    public static final String MISSING_PARAMETER = "Missing parameter.\n";
+    public static final String PARAMETER_NAME = "Parameter name: ";
+    public static final String EXPECTED_VALUE = "Expected value: ";
+    public static final String PASSED_VALUE = "Passed value: ";
+
+    private ExceptionUtils() {
+    }
+
+    public static String incorrectParameterMessage(String parameterName, String expectedValue, String passedValue) {
+        return INCORRECT_PARAMETER + PARAMETER_NAME + parameterName + System.lineSeparator() + EXPECTED_VALUE
+                + expectedValue + System.lineSeparator() + PASSED_VALUE + passedValue;
+    }
+
+    public static HyracksDataException suppressIntoHyracksDataException(HyracksDataException hde, Throwable th) {
+        if (hde == null) {
+            return new HyracksDataException(th);
+        } else {
+            hde.addSuppressed(th);
+            return hde;
+        }
+    }
+
+    public static Throwable suppress(Throwable suppressor, Throwable suppressed) {
+        if (suppressor == null) {
+            return suppressed;
+        } else if (suppressed != null) {
+            suppressor.addSuppressed(suppressed);
+        }
+        return suppressor;
+    }
+
+    public static HyracksDataException convertToHyracksDataException(Throwable throwable) {
+        if (throwable == null || throwable instanceof HyracksDataException) {
+            return (HyracksDataException) throwable;
+        }
+        return new HyracksDataException(throwable);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractApplicationMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractApplicationMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractApplicationMessage.java
index fbb9b86..5737957 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractApplicationMessage.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractApplicationMessage.java
@@ -22,7 +22,7 @@ import org.apache.asterix.common.messaging.api.IApplicationMessage;
 
 public abstract class AbstractApplicationMessage implements IApplicationMessage {
     private static final long serialVersionUID = 1L;
-    private long id;
+    protected long id;
 
     @Override
     public void setId(long id) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractFailbackPlanMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractFailbackPlanMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractFailbackPlanMessage.java
deleted file mode 100644
index 4aba0c2..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractFailbackPlanMessage.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.messaging;
-
-public abstract class AbstractFailbackPlanMessage extends AbstractApplicationMessage {
-
-    private static final long serialVersionUID = 1L;
-    protected final long planId;
-    protected final int requestId;
-
-    public AbstractFailbackPlanMessage(long planId, int requestId) {
-        this.planId = planId;
-        this.requestId = requestId;
-    }
-
-    public long getPlanId() {
-        return planId;
-    }
-
-    public int getRequestId() {
-        return requestId;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/CompleteFailbackRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/CompleteFailbackRequestMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/CompleteFailbackRequestMessage.java
deleted file mode 100644
index 6518ae5..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/CompleteFailbackRequestMessage.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.messaging;
-
-import java.util.Set;
-
-public class CompleteFailbackRequestMessage extends AbstractFailbackPlanMessage {
-
-    private static final long serialVersionUID = 1L;
-    private final Set<Integer> partitions;
-    private final String nodeId;
-
-    public CompleteFailbackRequestMessage(long planId, int requestId, String nodeId, Set<Integer> partitions) {
-        super(planId, requestId);
-        this.nodeId = nodeId;
-        this.partitions = partitions;
-    }
-
-    @Override
-    public ApplicationMessageType getMessageType() {
-        return ApplicationMessageType.COMPLETE_FAILBACK_REQUEST;
-    }
-
-    public Set<Integer> getPartitions() {
-        return partitions;
-    }
-
-    public String getNodeId() {
-        return nodeId;
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("Plan ID: " + planId);
-        sb.append(" Node ID: " + nodeId);
-        sb.append(" Partitions: " + partitions);
-        return sb.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/CompleteFailbackResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/CompleteFailbackResponseMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/CompleteFailbackResponseMessage.java
deleted file mode 100644
index a036ff0..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/CompleteFailbackResponseMessage.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.messaging;
-
-import java.util.Set;
-
-public class CompleteFailbackResponseMessage extends AbstractFailbackPlanMessage {
-
-    private static final long serialVersionUID = 1L;
-    private final Set<Integer> partitions;
-
-    public CompleteFailbackResponseMessage(long planId, int requestId, Set<Integer> partitions) {
-        super(planId, requestId);
-        this.partitions = partitions;
-    }
-
-    @Override
-    public ApplicationMessageType getMessageType() {
-        return ApplicationMessageType.COMPLETE_FAILBACK_RESPONSE;
-    }
-
-    public Set<Integer> getPartitions() {
-        return partitions;
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("Plan ID: " + planId);
-        sb.append(" Partitions: " + partitions);
-        return sb.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/PreparePartitionsFailbackRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/PreparePartitionsFailbackRequestMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/PreparePartitionsFailbackRequestMessage.java
deleted file mode 100644
index 02fe917..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/PreparePartitionsFailbackRequestMessage.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.messaging;
-
-import java.util.Set;
-
-public class PreparePartitionsFailbackRequestMessage extends AbstractFailbackPlanMessage {
-
-    private static final long serialVersionUID = 1L;
-    private final Set<Integer> partitions;
-    private boolean releaseMetadataNode = false;
-    private final String nodeID;
-
-    public PreparePartitionsFailbackRequestMessage(long planId, int requestId, String nodeId, Set<Integer> partitions) {
-        super(planId, requestId);
-        this.nodeID = nodeId;
-        this.partitions = partitions;
-    }
-
-    @Override
-    public ApplicationMessageType getMessageType() {
-        return ApplicationMessageType.PREPARE_PARTITIONS_FAILBACK_REQUEST;
-    }
-
-    public Set<Integer> getPartitions() {
-        return partitions;
-    }
-
-    public boolean isReleaseMetadataNode() {
-        return releaseMetadataNode;
-    }
-
-    public void setReleaseMetadataNode(boolean releaseMetadataNode) {
-        this.releaseMetadataNode = releaseMetadataNode;
-    }
-
-    public String getNodeID() {
-        return nodeID;
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("Plan ID: " + planId);
-        sb.append(" Partitions: " + partitions);
-        sb.append(" releaseMetadataNode: " + releaseMetadataNode);
-        return sb.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/PreparePartitionsFailbackResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/PreparePartitionsFailbackResponseMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/PreparePartitionsFailbackResponseMessage.java
deleted file mode 100644
index 467c6cb..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/PreparePartitionsFailbackResponseMessage.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.messaging;
-
-import java.util.Set;
-
-public class PreparePartitionsFailbackResponseMessage extends AbstractFailbackPlanMessage {
-
-    private static final long serialVersionUID = 1L;
-    private final Set<Integer> partitions;
-
-    public PreparePartitionsFailbackResponseMessage(long planId, int requestId, Set<Integer> partitions) {
-        super(planId, requestId);
-        this.partitions = partitions;
-    }
-
-    @Override
-    public ApplicationMessageType getMessageType() {
-        return ApplicationMessageType.PREPARE_PARTITIONS_FAILBACK_RESPONSE;
-    }
-
-    public Set<Integer> getPartitions() {
-        return partitions;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReplicaEventMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReplicaEventMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReplicaEventMessage.java
deleted file mode 100644
index bf0219c..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReplicaEventMessage.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.messaging;
-
-import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
-
-public class ReplicaEventMessage extends AbstractApplicationMessage {
-
-    private static final long serialVersionUID = 1L;
-    private final String nodeId;
-    private final ClusterEventType event;
-    private final String nodeIPAddress;
-
-    public ReplicaEventMessage(String nodeId, String nodeIPAddress, ClusterEventType event) {
-        this.nodeId = nodeId;
-        this.nodeIPAddress = nodeIPAddress;
-        this.event = event;
-    }
-
-    @Override
-    public ApplicationMessageType getMessageType() {
-        return ApplicationMessageType.REPLICA_EVENT;
-    }
-
-    public String getNodeId() {
-        return nodeId;
-    }
-
-    public ClusterEventType getEvent() {
-        return event;
-    }
-
-    public String getNodeIPAddress() {
-        return nodeIPAddress;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdMessage.java
deleted file mode 100644
index a2b94a7..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdMessage.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.messaging;
-
-public class ReportMaxResourceIdMessage extends AbstractApplicationMessage {
-    private static final long serialVersionUID = 1L;
-    public long maxResourceId;
-
-    @Override
-    public ApplicationMessageType getMessageType() {
-        return ApplicationMessageType.REPORT_MAX_RESOURCE_ID_RESPONSE;
-    }
-
-    public long getMaxResourceId() {
-        return maxResourceId;
-    }
-
-    public void setMaxResourceId(long maxResourceId) {
-        this.maxResourceId = maxResourceId;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdRequestMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdRequestMessage.java
deleted file mode 100644
index d2837ce..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdRequestMessage.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.messaging;
-
-public class ReportMaxResourceIdRequestMessage extends AbstractApplicationMessage {
-    private static final long serialVersionUID = 1L;
-    public long maxResourceId;
-
-    @Override
-    public ApplicationMessageType getMessageType() {
-        return ApplicationMessageType.REPORT_MAX_RESOURCE_ID_REQUEST;
-    }
-
-    public long getMaxResourceId() {
-        return maxResourceId;
-    }
-
-    public void setMaxResourceId(long maxResourceId) {
-        this.maxResourceId = maxResourceId;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestMessage.java
deleted file mode 100644
index daeb9c4..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestMessage.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.messaging;
-
-public class ResourceIdRequestMessage extends AbstractApplicationMessage {
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public ApplicationMessageType getMessageType() {
-        return ApplicationMessageType.RESOURCE_ID_REQUEST;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestResponseMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestResponseMessage.java
deleted file mode 100644
index 09c50d3..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestResponseMessage.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.messaging;
-
-public class ResourceIdRequestResponseMessage extends AbstractApplicationMessage {
-    private static final long serialVersionUID = 1L;
-
-    private long resourceId;
-    private Exception exception;
-
-    @Override
-    public ApplicationMessageType getMessageType() {
-        return ApplicationMessageType.RESOURCE_ID_RESPONSE;
-    }
-
-    public long getResourceId() {
-        return resourceId;
-    }
-
-    public void setResourceId(long resourceId) {
-        this.resourceId = resourceId;
-    }
-
-    public Exception getException() {
-        return exception;
-    }
-
-    public void setException(Exception exception) {
-        this.exception = exception;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeRequestMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeRequestMessage.java
deleted file mode 100644
index f7a276b..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeRequestMessage.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.messaging;
-
-public class TakeoverMetadataNodeRequestMessage extends AbstractApplicationMessage {
-
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public ApplicationMessageType getMessageType() {
-        return ApplicationMessageType.TAKEOVER_METADATA_NODE_REQUEST;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeResponseMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeResponseMessage.java
deleted file mode 100644
index bc98a62..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeResponseMessage.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.messaging;
-
-public class TakeoverMetadataNodeResponseMessage extends AbstractApplicationMessage {
-
-    private static final long serialVersionUID = 1L;
-    private final String nodeId;
-
-    public TakeoverMetadataNodeResponseMessage(String nodeId) {
-        this.nodeId = nodeId;
-    }
-
-    @Override
-    public ApplicationMessageType getMessageType() {
-        return ApplicationMessageType.TAKEOVER_METADATA_NODE_RESPONSE;
-    }
-
-    public String getNodeId() {
-        return nodeId;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java
deleted file mode 100644
index 5d07d5d..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.messaging;
-
-public class TakeoverPartitionsRequestMessage extends AbstractApplicationMessage {
-
-    private static final long serialVersionUID = 1L;
-    private final Integer[] partitions;
-    private final long requestId;
-    private final String nodeId;
-
-    public TakeoverPartitionsRequestMessage(long requestId, String nodeId, Integer[] partitionsToTakeover) {
-        this.requestId = requestId;
-        this.nodeId = nodeId;
-        this.partitions = partitionsToTakeover;
-    }
-
-    @Override
-    public ApplicationMessageType getMessageType() {
-        return ApplicationMessageType.TAKEOVER_PARTITIONS_REQUEST;
-    }
-
-    public Integer[] getPartitions() {
-        return partitions;
-    }
-
-    public long getRequestId() {
-        return requestId;
-    }
-
-    public String getNodeId() {
-        return nodeId;
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("Request ID: " + requestId);
-        sb.append(" Node ID: " + nodeId);
-        sb.append(" Partitions: ");
-        for (Integer partitionId : partitions) {
-            sb.append(partitionId + ",");
-        }
-        //remove last comma
-        sb.charAt(sb.length() - 1);
-        return sb.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsResponseMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsResponseMessage.java
deleted file mode 100644
index 54597d9..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsResponseMessage.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.messaging;
-
-public class TakeoverPartitionsResponseMessage extends AbstractApplicationMessage {
-
-    private static final long serialVersionUID = 1L;
-    private final Integer[] partitions;
-    private final String nodeId;
-    private final long requestId;
-
-    public TakeoverPartitionsResponseMessage(long requestId, String nodeId, Integer[] partitionsToTakeover) {
-        this.requestId = requestId;
-        this.nodeId = nodeId;
-        this.partitions = partitionsToTakeover;
-    }
-
-    @Override
-    public ApplicationMessageType getMessageType() {
-        return ApplicationMessageType.TAKEOVER_PARTITIONS_RESPONSE;
-    }
-
-    public Integer[] getPartitions() {
-        return partitions;
-    }
-
-    public String getNodeId() {
-        return nodeId;
-    }
-
-    public long getRequestId() {
-        return requestId;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
index 5f08dd8..b93082d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
@@ -18,30 +18,12 @@
  */
 package org.apache.asterix.common.messaging.api;
 
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.messages.IMessage;
+import org.apache.hyracks.api.service.IControllerService;
 
 public interface IApplicationMessage extends IMessage {
 
-    public enum ApplicationMessageType {
-        RESOURCE_ID_REQUEST,
-        RESOURCE_ID_RESPONSE,
-        REPORT_MAX_RESOURCE_ID_REQUEST,
-        REPORT_MAX_RESOURCE_ID_RESPONSE,
-        TAKEOVER_PARTITIONS_REQUEST,
-        TAKEOVER_PARTITIONS_RESPONSE,
-        TAKEOVER_METADATA_NODE_REQUEST,
-        TAKEOVER_METADATA_NODE_RESPONSE,
-        PREPARE_PARTITIONS_FAILBACK_REQUEST,
-        PREPARE_PARTITIONS_FAILBACK_RESPONSE,
-        COMPLETE_FAILBACK_REQUEST,
-        COMPLETE_FAILBACK_RESPONSE,
-        REPLICA_EVENT,
-        ACTIVE_ENTITY_TO_CC_MESSAGE,
-        ACTIVE_MANAGER_MESSAGE
-    }
-
-    public abstract ApplicationMessageType getMessageType();
-
     /**
      * Sets a unique message id that identifies this message within an NC.
      * This id is set by {@link INCMessageBroker#sendMessageToCC(IApplicationMessage, IApplicationMessageCallback)}
@@ -55,4 +37,16 @@ public interface IApplicationMessage extends IMessage {
      * @return The unique message id if it has been set, otherwise 0.
      */
     public long getId();
+
+    /**
+     * handle the message upon delivery
+     */
+    public void handle(IControllerService cs) throws HyracksDataException;
+
+    /**
+     * get a string representation for the message type
+     *
+     * @return
+     */
+    public String type();
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
index 7dafbd5..2290b93 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
@@ -21,9 +21,11 @@ package org.apache.asterix.common.messaging.api;
 import org.apache.hyracks.api.messages.IMessageBroker;
 
 public interface ICCMessageBroker extends IMessageBroker {
+    public static final long NO_CALLBACK_MESSAGE_ID = -1;
 
     /**
      * Sends the passed message to the specified {@code nodeId}
+     *
      * @param msg
      * @param nodeId
      * @throws Exception

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java
new file mode 100644
index 0000000..74589cc
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.metadata;
+
+public class MetadataIndexImmutableProperties {
+    private final String indexName;
+    private final int datasetId;
+    private final long resourceId;
+
+    // TODO(till? should we reconsider these numbers?!)
+    public static final int FIRST_AVAILABLE_EXTENSION_METADATA_DATASET_ID = 52;
+    public static final int FIRST_AVAILABLE_USER_DATASET_ID = 100;
+
+    public MetadataIndexImmutableProperties(String indexName, int datasetId, long resourceId) {
+        this.indexName = indexName;
+        this.datasetId = datasetId;
+        this.resourceId = resourceId;
+    }
+
+    public long getResourceId() {
+        return resourceId;
+    }
+
+    public String getIndexName() {
+        return indexName;
+    }
+
+    public int getDatasetId() {
+        return datasetId;
+    }
+
+    // Right now, we only have primary Metadata indexes. Hence, dataset name is always index name
+    public String getDatasetName() {
+        return indexName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NodeFailbackPlan.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NodeFailbackPlan.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NodeFailbackPlan.java
deleted file mode 100644
index 0591644..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NodeFailbackPlan.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.replication;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.asterix.common.messaging.CompleteFailbackRequestMessage;
-import org.apache.asterix.common.messaging.PreparePartitionsFailbackRequestMessage;
-
-public class NodeFailbackPlan {
-
-    public enum FailbackPlanState {
-        /**
-         * Initial state while selecting the nodes that will participate
-         * in the node failback plan.
-         */
-        PREPARING,
-        /**
-         * Once a pending {@link PreparePartitionsFailbackRequestMessage} request is added,
-         * the state is changed from PREPARING to PENDING_PARTICIPANT_REPONSE to indicate
-         * a response is expected and need to wait for it.
-         */
-        PENDING_PARTICIPANT_REPONSE,
-        /**
-         * Upon receiving the last {@link PreparePartitionsFailbackResponseMessage} response,
-         * the state changes from PENDING_PARTICIPANT_REPONSE to PENDING_COMPLETION to indicate
-         * the need to send {@link CompleteFailbackRequestMessage} to the failing back node.
-         */
-        PENDING_COMPLETION,
-        /**
-         * if any of the participants fail or the failing back node itself fails during
-         * and of these states (PREPARING, PENDING_PARTICIPANT_REPONSE, PENDING_COMPLETION),
-         * the state is changed to FAILED.
-         */
-        FAILED,
-        /**
-         * if the state is FAILED, and all pending responses (if any) have been received,
-         * the state changes from FAILED to PENDING_ROLLBACK to indicate the need to revert
-         * the effects of this plan (if any).
-         */
-        PENDING_ROLLBACK
-    }
-
-    private static long planIdGenerator = 0;
-    private long planId;
-    private final String nodeId;
-    private final Set<String> participants;
-    private final Map<Integer, String> partition2nodeMap;
-    private String nodeToReleaseMetadataManager;
-    private int requestId;
-    private Map<Integer, PreparePartitionsFailbackRequestMessage> pendingRequests;
-    private FailbackPlanState state;
-
-    public static NodeFailbackPlan createPlan(String nodeId) {
-        return new NodeFailbackPlan(planIdGenerator++, nodeId);
-    }
-
-    private NodeFailbackPlan(long planId, String nodeId) {
-        this.planId = planId;
-        this.nodeId = nodeId;
-        participants = new HashSet<>();
-        partition2nodeMap = new HashMap<>();
-        pendingRequests = new HashMap<>();
-        state = FailbackPlanState.PREPARING;
-    }
-
-    public synchronized void addPartitionToFailback(int partitionId, String currentActiveNode) {
-        partition2nodeMap.put(partitionId, currentActiveNode);
-    }
-
-    public synchronized void addParticipant(String nodeId) {
-        participants.add(nodeId);
-    }
-
-    public synchronized void notifyNodeFailure(String failedNode) {
-        if (participants.contains(failedNode)) {
-            if (state == FailbackPlanState.PREPARING) {
-                state = FailbackPlanState.FAILED;
-            } else if (state == FailbackPlanState.PENDING_PARTICIPANT_REPONSE) {
-                /**
-                 * if there is any pending request from this failed node,
-                 * it should be marked as completed and the plan should be marked as failed
-                 */
-                Set<Integer> failedRequests = new HashSet<>();
-                for (PreparePartitionsFailbackRequestMessage request : pendingRequests.values()) {
-                    if (request.getNodeID().equals(failedNode)) {
-                        failedRequests.add(request.getRequestId());
-                    }
-                }
-
-                if (failedRequests.size() > 0) {
-                    state = FailbackPlanState.FAILED;
-                    for (Integer failedRequestId : failedRequests) {
-                        markRequestCompleted(failedRequestId);
-                    }
-                }
-            }
-        } else if (nodeId.equals(failedNode)) {
-            //if the failing back node is the failed node itself
-            state = FailbackPlanState.FAILED;
-            updateState();
-        }
-    }
-
-    public synchronized Set<Integer> getPartitionsToFailback() {
-        return new HashSet<>(partition2nodeMap.keySet());
-    }
-
-    public synchronized void addPendingRequest(PreparePartitionsFailbackRequestMessage msg) {
-        //if this is the first request
-        if (pendingRequests.size() == 0) {
-            state = FailbackPlanState.PENDING_PARTICIPANT_REPONSE;
-        }
-        pendingRequests.put(msg.getRequestId(), msg);
-    }
-
-    public synchronized void markRequestCompleted(int requestId) {
-        pendingRequests.remove(requestId);
-        updateState();
-    }
-
-    private void updateState() {
-        if (pendingRequests.size() == 0) {
-            switch (state) {
-                case PREPARING:
-                case FAILED:
-                    state = FailbackPlanState.PENDING_ROLLBACK;
-                    break;
-                case PENDING_PARTICIPANT_REPONSE:
-                    state = FailbackPlanState.PENDING_COMPLETION;
-                    break;
-                default:
-                    break;
-            }
-        }
-    }
-
-    public synchronized Set<PreparePartitionsFailbackRequestMessage> getPlanFailbackRequests() {
-        Set<PreparePartitionsFailbackRequestMessage> node2Partitions = new HashSet<>();
-        /**
-         * for each participant, construct a request with the partitions
-         * that will be failed back or flushed.
-         */
-        for (String participant : participants) {
-            Set<Integer> partitionToPrepareForFailback = new HashSet<>();
-            for (Map.Entry<Integer, String> entry : partition2nodeMap.entrySet()) {
-                if (entry.getValue().equals(participant)) {
-                    partitionToPrepareForFailback.add(entry.getKey());
-                }
-            }
-            PreparePartitionsFailbackRequestMessage msg = new PreparePartitionsFailbackRequestMessage(planId,
-                    requestId++, participant, partitionToPrepareForFailback);
-            if (participant.equals(nodeToReleaseMetadataManager)) {
-                msg.setReleaseMetadataNode(true);
-            }
-            node2Partitions.add(msg);
-        }
-        return node2Partitions;
-    }
-
-    public synchronized CompleteFailbackRequestMessage getCompleteFailbackRequestMessage() {
-        return new CompleteFailbackRequestMessage(planId, requestId++, nodeId, getPartitionsToFailback());
-    }
-
-    public String getNodeId() {
-        return nodeId;
-    }
-
-    public long getPlanId() {
-        return planId;
-    }
-
-    public void setNodeToReleaseMetadataManager(String nodeToReleaseMetadataManager) {
-        this.nodeToReleaseMetadataManager = nodeToReleaseMetadataManager;
-    }
-
-    public synchronized FailbackPlanState getState() {
-        return state;
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("Plan ID: " + planId);
-        sb.append(" Failing back node: " + nodeId);
-        sb.append(" Participants: " + participants);
-        sb.append(" Partitions to Failback: " + partition2nodeMap.keySet());
-        return sb.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixResourceIdManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixResourceIdManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixResourceIdManager.java
new file mode 100644
index 0000000..06038cd
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixResourceIdManager.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.transactions;
+
+public interface IAsterixResourceIdManager {
+
+    long createResourceId();
+
+    boolean reported(String nodeId);
+
+    void report(String nodeId, long maxResourceId);
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
index b49a719..e4f21a6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
@@ -23,8 +23,8 @@ import java.util.ArrayList;
 import java.util.Map;
 
 import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 
 public interface IExternalDataSourceFactory extends Serializable {
@@ -73,11 +73,11 @@ public interface IExternalDataSourceFactory extends Serializable {
             AlgebricksAbsolutePartitionConstraint constraints, int count) {
         if (constraints == null) {
             ArrayList<String> locs = new ArrayList<String>();
-            Map<String, String[]> stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
+            Map<String, String[]> stores = AsterixAppContextInfo.INSTANCE.getMetadataProperties().getStores();
             int i = 0;
             while (i < count) {
                 for (String node : stores.keySet()) {
-                    int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(node);
+                    int numIODevices = AsterixClusterProperties.INSTANCE.getIODevices(node).length;
                     for (int k = 0; k < numIODevices; k++) {
                         locs.add(node);
                         i++;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
index cf4ed19..87ee167 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
@@ -18,8 +18,6 @@
  */
 package org.apache.asterix.external.dataflow;
 
-import javax.annotation.Nonnull;
-
 import org.apache.asterix.external.api.IDataFlowController;
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -34,7 +32,7 @@ public abstract class AbstractFeedDataFlowController implements IDataFlowControl
     protected final FeedLogManager feedLogManager;
 
     public AbstractFeedDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
-            @Nonnull FeedLogManager feedLogManager, int numOfFields) {
+            FeedLogManager feedLogManager, int numOfFields) {
         this.feedLogManager = feedLogManager;
         this.numOfFields = numOfFields;
         this.ctx = ctx;


Mime
View raw message