asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Murtadha Hubail (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: PREVIEW: Customizable NC Startup Sequence
Date Wed, 04 Jan 2017 15:03:22 GMT
Murtadha Hubail has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1414

Change subject: PREVIEW: Customizable NC Startup Sequence
......................................................................

PREVIEW: Customizable NC Startup Sequence

Preview implementation to discuss the design.

Change-Id: I074c48bb1994c9d2be8706e9a2ac1eab97756fc2
---
M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java
A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CheckpointTask.java
A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java
A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java
A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportMaxResourceIdTask.java
A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartLifecycleComponentsTask.java
A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java
A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
M asterixdb/asterix-app/src/main/resources/cluster.xml
M asterixdb/asterix-app/src/test/resources/runtimets/results/api/replication/replication.1.adm
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAppRuntimeContext.java
A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INCLifecycleTask.java
A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ReplicationProperties.java
A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ChainedDeclusteringReplicationStrategy.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicationStrategyFactory.java
A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexFileProperties.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
M asterixdb/asterix-common/src/main/resources/schema/cluster.xsd
M asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
M asterixdb/asterix-installer/src/main/resources/clusters/local/local_with_replication.xml
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
A asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/NCLifecycleTaskReportMessage.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
A asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplayPartitionLogsRequestMessage.java
A asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplayPartitionLogsResponseMessage.java
A asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/StartupTaskRequestMessage.java
A asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/StartupTaskResponseMessage.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
A asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/replication/FaultToleranceStrategyFactory.java
A asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/replication/IFaultToleranceStrategy.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java
A asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/FaultToleranceManager.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
M hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
M hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
57 files changed, 2,673 insertions(+), 883 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/14/1414/1

diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index bdce0ca..1df94d9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -30,8 +30,8 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.common.config.PropertiesAccessor;
 import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.config.PropertiesAccessor;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.hyracks.bootstrap.CCApplicationEntryPoint;
 import org.apache.asterix.hyracks.bootstrap.NCApplicationEntryPoint;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index b1ca062..3f8227e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -29,26 +29,25 @@
 
 import org.apache.asterix.active.ActiveManager;
 import org.apache.asterix.api.common.AppRuntimeContextProviderForRecovery;
-import org.apache.asterix.common.api.ThreadExecutor;
 import org.apache.asterix.common.api.IAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.ThreadExecutor;
 import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.config.AsterixExtension;
 import org.apache.asterix.common.config.BuildProperties;
 import org.apache.asterix.common.config.CompilerProperties;
-import org.apache.asterix.common.config.AsterixExtension;
 import org.apache.asterix.common.config.ExtensionProperties;
 import org.apache.asterix.common.config.ExternalProperties;
 import org.apache.asterix.common.config.FeedProperties;
+import org.apache.asterix.common.config.IPropertiesProvider;
+import org.apache.asterix.common.config.MessagingProperties;
 import org.apache.asterix.common.config.MetadataProperties;
 import org.apache.asterix.common.config.PropertiesAccessor;
 import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.config.StorageProperties;
 import org.apache.asterix.common.config.TransactionProperties;
-import org.apache.asterix.common.config.ClusterProperties;
-import org.apache.asterix.common.config.IPropertiesProvider;
-import org.apache.asterix.common.config.MessagingProperties;
-import org.apache.asterix.common.context.FileMapManager;
 import org.apache.asterix.common.context.DatasetLifecycleManager;
+import org.apache.asterix.common.context.FileMapManager;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.library.ILibraryManager;
@@ -136,12 +135,14 @@
 
     private final ILibraryManager libraryManager;
     private final NCExtensionManager ncExtensionManager;
+    private final boolean initialRun;
 
-    public NCAppRuntimeContext(INCApplicationContext ncApplicationContext, List<AsterixExtension> extensions)
+    public NCAppRuntimeContext(INCApplicationContext ncApplicationContext, List<AsterixExtension> extensions, boolean initialRun)
             throws AsterixException, InstantiationException, IllegalAccessException,
             ClassNotFoundException, IOException {
         List<AsterixExtension> allExtensions = new ArrayList<>();
         this.ncApplicationContext = ncApplicationContext;
+        this.initialRun = initialRun;
         PropertiesAccessor propertiesAccessor =
                 PropertiesAccessor.getInstance(ncApplicationContext.getAppConfig());
         compilerProperties = new CompilerProperties(propertiesAccessor);
@@ -163,7 +164,7 @@
     }
 
     @Override
-    public void initialize(boolean initialRun) throws IOException, ACIDException {
+    public void initialize() throws IOException, ACIDException {
         Logger.getLogger("org.apache.asterix").setLevel(externalProperties.getLogLevel());
         Logger.getLogger("org.apache.hyracks").setLevel(externalProperties.getLogLevel());
 
@@ -194,7 +195,7 @@
 
         IRecoveryManager recoveryMgr = txnSubsystem.getRecoveryManager();
         SystemState systemState = recoveryMgr.getSystemState();
-        if (initialRun || systemState == SystemState.NEW_UNIVERSE) {
+        if (systemState  == SystemState.INITIAL_RUN || systemState == SystemState.NEW_UNIVERSE) {
             //delete any storage data before the resource factory is initialized
             localResourceRepository.deleteStorageData(true);
         }
@@ -208,7 +209,7 @@
         activeManager = new ActiveManager(threadExecutor, ncApplicationContext.getNodeId(),
                 feedProperties.getMemoryComponentGlobalBudget(), compilerProperties.getFrameSize());
 
-        if (ClusterProperties.INSTANCE.isReplicationEnabled()) {
+        if (replicationProperties.isParticipant(ncApplicationContext.getNodeId())) {
             String nodeId = ncApplicationContext.getNodeId();
 
             replicaResourcesManager = new ReplicaResourcesManager(localResourceRepository, metadataProperties);
@@ -227,10 +228,8 @@
              * add the partitions that will be replicated in this node as inactive partitions
              */
             //get nodes which replicate to this node
-            Set<String> replicationClients = replicationProperties.getNodeReplicationClients(nodeId);
-            //remove the node itself
-            replicationClients.remove(nodeId);
-            for (String clientId : replicationClients) {
+            Set<String> remotePrimaryReplicas = replicationProperties.getRemotePrimaryReplicasIds(nodeId);
+            for (String clientId : remotePrimaryReplicas) {
                 //get the partitions of each client
                 ClusterPartition[] clientPartitions = metadataProperties.getNodePartitions().get(clientId);
                 for (ClusterPartition partition : clientPartitions) {
@@ -473,4 +472,8 @@
         return ncExtensionManager;
     }
 
+    @Override
+    public boolean isInitialRun() {
+        return initialRun;
+    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java
new file mode 100644
index 0000000..1462070
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc.task;
+
+import org.apache.asterix.common.api.IAppRuntimeContext;
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class BindMetadataNodeTask implements INCLifecycleTask {
+
+    private static final long serialVersionUID = 1L;
+    private final boolean exportStub;
+
+    public BindMetadataNodeTask(boolean exportStub) {
+        this.exportStub = exportStub;
+    }
+
+    @Override
+    public void start(IControllerService cs) throws HyracksDataException {
+        NodeControllerService ncs = (NodeControllerService) cs;
+        IAppRuntimeContext runtimeContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+        try {
+            if (exportStub) {
+                runtimeContext.exportMetadataNodeStub();
+            } else {
+                runtimeContext.unexportMetadataNodeStub();
+            }
+        } catch (Exception e) {
+            throw ExceptionUtils.convertToHyracksDataException(e);
+        }
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CheckpointTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CheckpointTask.java
new file mode 100644
index 0000000..a25b877
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CheckpointTask.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc.task;
+
+import org.apache.asterix.common.api.IAppRuntimeContext;
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.transactions.ICheckpointManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class CheckpointTask implements INCLifecycleTask {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public void start(IControllerService cs) throws HyracksDataException {
+        NodeControllerService ncs = (NodeControllerService) cs;
+        IAppRuntimeContext runtimeContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+        ICheckpointManager checkpointMgr = runtimeContext.getTransactionSubsystem().getCheckpointManager();
+        checkpointMgr.doSharpCheckpoint();
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java
new file mode 100644
index 0000000..69ef0a5
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc.task;
+
+import org.apache.asterix.app.external.ExternalLibraryUtils;
+import org.apache.asterix.common.api.IAppRuntimeContext;
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class ExternalLibrarySetupTask implements INCLifecycleTask {
+
+    private static final long serialVersionUID = 1L;
+    private final boolean metadataNode;
+
+    public ExternalLibrarySetupTask(boolean metadataNode) {
+        this.metadataNode = metadataNode;
+    }
+
+    @Override
+    public void start(IControllerService cs) throws HyracksDataException {
+        NodeControllerService ncs = (NodeControllerService) cs;
+        IAppRuntimeContext appContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+        try {
+            ExternalLibraryUtils.setUpExternaLibraries(appContext.getLibraryManager(), metadataNode);
+        } catch (Exception e) {
+            throw ExceptionUtils.convertToHyracksDataException(e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java
new file mode 100644
index 0000000..ac99a4c
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc.task;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.asterix.common.api.IAppRuntimeContext;
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class LocalRecoveryTask implements INCLifecycleTask {
+
+    private static final long serialVersionUID = 1L;
+    private final Set<Integer> partitions;
+
+    public LocalRecoveryTask(Set<Integer> partitions) {
+        this.partitions = partitions;
+    }
+
+    @Override
+    public void start(IControllerService cs) throws HyracksDataException {
+        NodeControllerService ncs = (NodeControllerService) cs;
+        IAppRuntimeContext runtimeContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+        try {
+            runtimeContext.getTransactionSubsystem().getRecoveryManager().startLocalRecovery(partitions);
+        } catch (IOException | ACIDException e) {
+            throw ExceptionUtils.convertToHyracksDataException(e);
+        }
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
new file mode 100644
index 0000000..9ca245e
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc.task;
+
+import org.apache.asterix.common.api.IAppRuntimeContext;
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class MetadataBootstrapTask implements INCLifecycleTask {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public void start(IControllerService cs) throws HyracksDataException {
+        NodeControllerService ncs = (NodeControllerService) cs;
+        IAppRuntimeContext appContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+        try {
+            SystemState state = appContext.getTransactionSubsystem().getRecoveryManager().getSystemState();
+            appContext.initializeMetadata(state == SystemState.NEW_UNIVERSE);
+        } catch (Exception e) {
+            throw ExceptionUtils.convertToHyracksDataException(e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportMaxResourceIdTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportMaxResourceIdTask.java
new file mode 100644
index 0000000..1884fc3
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportMaxResourceIdTask.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc.task;
+
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.runtime.message.ReportMaxResourceIdMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class ReportMaxResourceIdTask implements INCLifecycleTask {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public void start(IControllerService cs) throws HyracksDataException {
+        ReportMaxResourceIdMessage.send((NodeControllerService) cs);
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartLifecycleComponentsTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartLifecycleComponentsTask.java
new file mode 100644
index 0000000..360b7ba
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartLifecycleComponentsTask.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc.task;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAppRuntimeContext;
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.config.IPropertiesProvider;
+import org.apache.asterix.common.config.MetadataProperties;
+import org.apache.asterix.hyracks.bootstrap.AsterixStateDumpHandler;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
+import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.control.nc.application.NCApplicationContext;
+
+public class StartLifecycleComponentsTask implements INCLifecycleTask {
+
+    private static final Logger LOGGER = Logger.getLogger(StartLifecycleComponentsTask.class.getName());
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public void start(IControllerService cs) throws HyracksDataException {
+        NodeControllerService ncs = (NodeControllerService) cs;
+        IAppRuntimeContext runtimeContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+        NCApplicationContext appContext = ncs.getApplicationContext();
+        MetadataProperties metadataProperties = ((IPropertiesProvider) runtimeContext).getMetadataProperties();
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Starting lifecycle components");
+        }
+        Map<String, String> lifecycleMgmtConfiguration = new HashMap<>();
+        String dumpPathKey = LifeCycleComponentManager.Config.DUMP_PATH_KEY;
+        String dumpPath = metadataProperties.getCoredumpPath(appContext.getNodeId());
+        lifecycleMgmtConfiguration.put(dumpPathKey, dumpPath);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Coredump directory for NC is: " + dumpPath);
+        }
+        ILifeCycleComponentManager lccm = appContext.getLifeCycleComponentManager();
+        lccm.configure(lifecycleMgmtConfiguration);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Configured:" + lccm);
+        }
+        appContext.setStateDumpHandler(new AsterixStateDumpHandler(appContext.getNodeId(), lccm.getDumpPath(), lccm));
+        lccm.startAll();
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java
new file mode 100644
index 0000000..57405ec
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc.task;
+
+import org.apache.asterix.common.api.IAppRuntimeContext;
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class StartReplicationServiceTask implements INCLifecycleTask {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public void start(IControllerService cs) throws HyracksDataException {
+        NodeControllerService ncs = (NodeControllerService) cs;
+        IAppRuntimeContext runtimeContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+        try {
+            //Open replication channel
+            runtimeContext.getReplicationChannel().start();
+            final IReplicationManager replicationManager = runtimeContext.getReplicationManager();
+            //Check the state of remote replicas
+            replicationManager.initializeReplicasState();
+            //Start replication after the state of remote replicas has been initialized.
+            replicationManager.startReplicationThreads();
+        } catch (Exception e) {
+            throw ExceptionUtils.convertToHyracksDataException(e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
new file mode 100644
index 0000000..005d516
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.replication;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.config.ReplicationProperties;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.common.replication.Replica;
+import org.apache.asterix.runtime.message.CompleteFailbackRequestMessage;
+import org.apache.asterix.runtime.message.CompleteFailbackResponseMessage;
+import org.apache.asterix.runtime.message.NodeFailbackPlan;
+import org.apache.asterix.runtime.message.NodeFailbackPlan.FailbackPlanState;
+import org.apache.asterix.runtime.replication.IFaultToleranceStrategy;
+import org.apache.asterix.runtime.message.PreparePartitionsFailbackRequestMessage;
+import org.apache.asterix.runtime.message.PreparePartitionsFailbackResponseMessage;
+import org.apache.asterix.runtime.message.ReplicaEventMessage;
+import org.apache.asterix.runtime.message.TakeoverMetadataNodeRequestMessage;
+import org.apache.asterix.runtime.message.TakeoverMetadataNodeResponseMessage;
+import org.apache.asterix.runtime.message.TakeoverPartitionsRequestMessage;
+import org.apache.asterix.runtime.message.TakeoverPartitionsResponseMessage;
+import org.apache.asterix.runtime.util.AppContextInfo;
+import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class AutoFaultToleranceStrategy implements IFaultToleranceStrategy {
+
+    private static final Logger LOGGER = Logger.getLogger(AutoFaultToleranceStrategy.class.getName());
+    private long clusterRequestId = 0;
+
+    private Set<String> failedNodes = new HashSet<>();
+    private LinkedList<NodeFailbackPlan> pendingProcessingFailbackPlans = new LinkedList<>();
+    private Map<Long, NodeFailbackPlan> planId2FailbackPlanMap = new HashMap<>();
+    private static final String CLUSTER_NET_IP_ADDRESS_KEY = "cluster-net-ip-address";
+    private Map<Long, TakeoverPartitionsRequestMessage> pendingTakeoverRequests = new HashMap<>();;
+    private String currentMetadataNode;
+    private boolean metadataNodeActive;
+    private IClusterStateManager clusterManager;
+    private ICCMessageBroker messageBroker;
+    private IReplicationStrategy replicationStrategy;
+
+    @Override
+    public void notifyNodeJoin(String nodeId) throws HyracksDataException {
+        if (failedNodes.contains(nodeId)) {
+            prepareFailbackPlan(nodeId);
+            return;
+        }
+        if (nodeId.equals(currentMetadataNode)) {
+            metadataNodeActive = true;
+            clusterManager.updateMetadataNode(currentMetadataNode, metadataNodeActive);
+        }
+        clusterManager.updateNodePartitions(nodeId, true);
+        validateClusterState();
+    }
+
+    @Override
+    public void notifyNodeFailure(String nodeId) throws HyracksDataException {
+        //if this node was waiting for failback and failed before it completed
+        if (failedNodes.contains(nodeId)) {
+            notifyFailbackPlansNodeFailure(nodeId);
+            revertFailedFailbackPlanEffects();
+            return;
+        }
+        //an active node failed
+        failedNodes.add(nodeId);
+        clusterManager.updateNodePartitions(nodeId, false);
+        if (nodeId.equals(currentMetadataNode)) {
+            metadataNodeActive = false;
+            clusterManager.updateMetadataNode(nodeId, metadataNodeActive);
+        }
+        validateClusterState();
+        notifyImpactedReplicas(nodeId, ClusterEventType.NODE_FAILURE);
+        notifyFailbackPlansNodeFailure(nodeId);
+        requestPartitionsTakeover(nodeId);
+    }
+
+    private synchronized void notifyImpactedReplicas(String nodeId, ClusterEventType event) {
+        List<String> remoteReplicas = replicationStrategy.getRemotePrimaryReplicas(nodeId).stream().map(Replica::getId)
+                .collect(Collectors.toList());
+        String nodeIdAddress = "";
+        Map<String, Map<String, String>> activeNcConfiguration = clusterManager.getActiveNcConfiguration();
+        //in case the node joined with a new IP address, we need to send it to the other replicas
+        if (event == ClusterEventType.NODE_JOIN) {
+            nodeIdAddress = activeNcConfiguration.get(nodeId).get(CLUSTER_NET_IP_ADDRESS_KEY);
+        }
+
+        ReplicaEventMessage msg = new ReplicaEventMessage(nodeId, nodeIdAddress, event);
+        for (String replica : remoteReplicas) {
+            //if the remote replica is alive, send the event
+            if (activeNcConfiguration.containsKey(replica)) {
+                try {
+                    messageBroker.sendApplicationMessageToNC(msg, replica);
+                } catch (Exception e) {
+                    LOGGER.log(Level.WARNING, "Failed sending an application message to an NC", e);
+                }
+            }
+        }
+    }
+
+    private synchronized void notifyFailbackPlansNodeFailure(String nodeId) {
+        Iterator<NodeFailbackPlan> iterator = planId2FailbackPlanMap.values().iterator();
+        while (iterator.hasNext()) {
+            NodeFailbackPlan plan = iterator.next();
+            plan.notifyNodeFailure(nodeId);
+        }
+    }
+
+    private synchronized void revertFailedFailbackPlanEffects() {
+        Iterator<NodeFailbackPlan> iterator = planId2FailbackPlanMap.values().iterator();
+        while (iterator.hasNext()) {
+            NodeFailbackPlan plan = iterator.next();
+            if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
+                //TODO if the failing back node is still active, notify it to construct a new plan for it
+                iterator.remove();
+
+                //reassign the partitions that were supposed to be failed back to an active replica
+                requestPartitionsTakeover(plan.getNodeId());
+            }
+        }
+    }
+
+    private synchronized void requestPartitionsTakeover(String failedNodeId) {
+        //replica -> list of partitions to takeover
+        Map<String, List<Integer>> partitionRecoveryPlan = new HashMap<>();
+        ReplicationProperties replicationProperties = AppContextInfo.INSTANCE.getReplicationProperties();
+
+        //collect the partitions of the failed NC
+        List<ClusterPartition> lostPartitions = getNodeAssignedPartitions(failedNodeId);
+        if (!lostPartitions.isEmpty()) {
+            for (ClusterPartition partition : lostPartitions) {
+                //find replicas for this partitions
+                Set<String> partitionReplicas = replicationProperties.getNodeReplicasIds(partition.getNodeId());
+                //find a replica that is still active
+                for (String replica : partitionReplicas) {
+                    //TODO (mhubail) currently this assigns the partition to the first found active replica.
+                    //It needs to be modified to consider load balancing.
+                    if (addActiveReplica(replica, partition, partitionRecoveryPlan)) {
+                        break;
+                    }
+                }
+            }
+
+            if (partitionRecoveryPlan.size() == 0) {
+                //no active replicas were found for the failed node
+                LOGGER.severe("Could not find active replicas for the partitions " + lostPartitions);
+                return;
+            } else {
+                LOGGER.info("Partitions to recover: " + lostPartitions);
+            }
+            //For each replica, send a request to takeover the assigned partitions
+            for (Entry<String, List<Integer>> entry : partitionRecoveryPlan.entrySet()) {
+                String replica = entry.getKey();
+                Integer[] partitionsToTakeover = entry.getValue().toArray(new Integer[entry.getValue().size()]);
+                long requestId = clusterRequestId++;
+                TakeoverPartitionsRequestMessage takeoverRequest = new TakeoverPartitionsRequestMessage(requestId,
+                        replica, partitionsToTakeover);
+                pendingTakeoverRequests.put(requestId, takeoverRequest);
+                try {
+                    messageBroker.sendApplicationMessageToNC(takeoverRequest, replica);
+                } catch (Exception e) {
+                    /**
+                     * if we fail to send the request, it means the NC we tried to send the request to
+                     * has failed. When the failure notification arrives, we will send any pending request
+                     * that belongs to the failed NC to a different active replica.
+                     */
+                    LOGGER.log(Level.WARNING, "Failed to send takeover request: " + takeoverRequest, e);
+                }
+            }
+        }
+    }
+
+    private boolean addActiveReplica(String replica, ClusterPartition partition,
+            Map<String, List<Integer>> partitionRecoveryPlan) {
+        Map<String, Map<String, String>> activeNcConfiguration = clusterManager.getActiveNcConfiguration();
+        if (activeNcConfiguration.containsKey(replica) && !failedNodes.contains(replica)) {
+            if (!partitionRecoveryPlan.containsKey(replica)) {
+                List<Integer> replicaPartitions = new ArrayList<>();
+                replicaPartitions.add(partition.getPartitionId());
+                partitionRecoveryPlan.put(replica, replicaPartitions);
+            } else {
+                partitionRecoveryPlan.get(replica).add(partition.getPartitionId());
+            }
+            return true;
+        }
+        return false;
+    }
+
+    private synchronized void prepareFailbackPlan(String failingBackNodeId) {
+        NodeFailbackPlan plan = NodeFailbackPlan.createPlan(failingBackNodeId);
+        pendingProcessingFailbackPlans.add(plan);
+        planId2FailbackPlanMap.put(plan.getPlanId(), plan);
+
+        //get all partitions this node requires to resync
+        ReplicationProperties replicationProperties = AppContextInfo.INSTANCE.getReplicationProperties();
+        Set<String> nodeReplicas = replicationProperties.getNodeReplicasIds(failingBackNodeId);
+        clusterManager.getClusterPartitons();
+        for (String replicaId : nodeReplicas) {
+            ClusterPartition[] nodePartitions = clusterManager.getNodePartitions(replicaId);
+            for (ClusterPartition partition : nodePartitions) {
+                plan.addParticipant(partition.getActiveNodeId());
+                /**
+                 * if the partition original node is the returning node,
+                 * add it to the list of the partitions which will be failed back
+                 */
+                if (partition.getNodeId().equals(failingBackNodeId)) {
+                    plan.addPartitionToFailback(partition.getPartitionId(), partition.getActiveNodeId());
+                }
+            }
+        }
+
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Prepared Failback plan: " + plan.toString());
+        }
+
+        processPendingFailbackPlans();
+    }
+
+    private synchronized void processPendingFailbackPlans() {
+        ClusterState state = clusterManager.getState();
+        /**
+         * if the cluster state is not ACTIVE, then failbacks should not be processed
+         * since some partitions are not active
+         */
+        if (state == ClusterState.ACTIVE) {
+            while (!pendingProcessingFailbackPlans.isEmpty()) {
+                //take the first pending failback plan
+                NodeFailbackPlan plan = pendingProcessingFailbackPlans.pop();
+                /**
+                 * A plan at this stage will be in one of two states:
+                 * 1. PREPARING -> the participants were selected but we haven't sent any request.
+                 * 2. PENDING_ROLLBACK -> a participant failed before we send any requests
+                 */
+                if (plan.getState() == FailbackPlanState.PREPARING) {
+                    //set the partitions that will be failed back as inactive
+                    String failbackNode = plan.getNodeId();
+                    for (Integer partitionId : plan.getPartitionsToFailback()) {
+                        //partition expected to be returned to the failing back node
+                        clusterManager.updateClusterPartition(partitionId, failbackNode, false);
+                    }
+
+                    /**
+                     * if the returning node is the original metadata node,
+                     * then metadata node will change after the failback completes
+                     */
+                    String originalMetadataNode = AppContextInfo.INSTANCE.getMetadataProperties().getMetadataNodeName();
+                    if (originalMetadataNode.equals(failbackNode)) {
+                        plan.setNodeToReleaseMetadataManager(currentMetadataNode);
+                        currentMetadataNode = "";
+                        metadataNodeActive = false;
+                        clusterManager.updateMetadataNode(currentMetadataNode, metadataNodeActive);
+                    }
+
+                    //force new jobs to wait
+                    clusterManager.forceIntoState(ClusterState.REBALANCING);
+                    handleFailbackRequests(plan, messageBroker);
+                    /**
+                     * wait until the current plan is completed before processing the next plan.
+                     * when the current one completes or is reverted, the cluster state will be
+                     * ACTIVE again, and the next failback plan (if any) will be processed.
+                     */
+                    break;
+                } else if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
+                    //this plan failed before sending any requests -> nothing to rollback
+                    planId2FailbackPlanMap.remove(plan.getPlanId());
+                }
+            }
+        }
+    }
+
+    private void handleFailbackRequests(NodeFailbackPlan plan, ICCMessageBroker messageBroker) {
+        //send requests to other nodes to complete on-going jobs and prepare partitions for failback
+        for (PreparePartitionsFailbackRequestMessage request : plan.getPlanFailbackRequests()) {
+            try {
+                messageBroker.sendApplicationMessageToNC(request, request.getNodeID());
+                plan.addPendingRequest(request);
+            } catch (Exception e) {
+                LOGGER.log(Level.WARNING, "Failed to send failback request to: " + request.getNodeID(), e);
+                plan.notifyNodeFailure(request.getNodeID());
+                revertFailedFailbackPlanEffects();
+                break;
+            }
+        }
+    }
+
+    public synchronized List<ClusterPartition> getNodeAssignedPartitions(String nodeId) {
+        List<ClusterPartition> nodePartitions = new ArrayList<>();
+        ClusterPartition[] clusterPartitons = clusterManager.getClusterPartitons();
+        Map<Integer, ClusterPartition> clusterPartitionsMap = new HashMap<>();
+        for (ClusterPartition partition : clusterPartitons) {
+            clusterPartitionsMap.put(partition.getPartitionId(), partition);
+        }
+        for (ClusterPartition partition : clusterPartitons) {
+            if (partition.getActiveNodeId().equals(nodeId)) {
+                nodePartitions.add(partition);
+            }
+        }
+        /**
+         * if there is any pending takeover request this node was supposed to handle,
+         * it needs to be sent to a different replica
+         */
+        List<Long> failedTakeoverRequests = new ArrayList<>();
+        for (TakeoverPartitionsRequestMessage request : pendingTakeoverRequests.values()) {
+            if (request.getNodeId().equals(nodeId)) {
+                for (Integer partitionId : request.getPartitions()) {
+                    nodePartitions.add(clusterPartitionsMap.get(partitionId));
+                }
+                failedTakeoverRequests.add(request.getRequestId());
+            }
+        }
+
+        //remove failed requests
+        for (Long requestId : failedTakeoverRequests) {
+            pendingTakeoverRequests.remove(requestId);
+        }
+        return nodePartitions;
+    }
+
+    @Override
+    public synchronized void process(TakeoverPartitionsResponseMessage response) throws HyracksDataException {
+        for (Integer partitonId : response.getPartitions()) {
+            clusterManager.updateClusterPartition(partitonId, response.getNodeId(), true);
+        }
+        pendingTakeoverRequests.remove(response.getRequestId());
+        validateClusterState();
+    }
+
+    @Override
+    public synchronized void process(TakeoverMetadataNodeResponseMessage response) throws HyracksDataException {
+        currentMetadataNode = response.getNodeId();
+        metadataNodeActive = true;
+        clusterManager.updateMetadataNode(currentMetadataNode, metadataNodeActive);
+        validateClusterState();
+    }
+
+    private void validateClusterState() throws HyracksDataException {
+        clusterManager.updateClusterState();
+        ClusterState newState = clusterManager.getState();
+        // PENDING: all partitions are active but metadata node is not
+        if (newState == ClusterState.PENDING) {
+            requestMetadataNodeTakeover();
+        } else if (newState == ClusterState.ACTIVE) {
+            processPendingFailbackPlans();
+        }
+    }
+
+    @Override
+    public synchronized void process(PreparePartitionsFailbackResponseMessage msg) {
+        NodeFailbackPlan plan = planId2FailbackPlanMap.get(msg.getPlanId());
+        plan.markRequestCompleted(msg.getRequestId());
+        /**
+         * A plan at this stage will be in one of three states:
+         * 1. PENDING_PARTICIPANT_REPONSE -> one or more responses are still expected (wait).
+         * 2. PENDING_COMPLETION -> all responses received (time to send completion request).
+         * 3. PENDING_ROLLBACK -> the plan failed and we just received the final pending response (revert).
+         */
+        if (plan.getState() == FailbackPlanState.PENDING_COMPLETION) {
+            CompleteFailbackRequestMessage request = plan.getCompleteFailbackRequestMessage();
+
+            //send complete resync and takeover partitions to the failing back node
+            try {
+                messageBroker.sendApplicationMessageToNC(request, request.getNodeId());
+            } catch (Exception e) {
+                LOGGER.log(Level.WARNING, "Failed to send complete failback request to: " + request.getNodeId(), e);
+                notifyFailbackPlansNodeFailure(request.getNodeId());
+                revertFailedFailbackPlanEffects();
+            }
+        } else if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
+            revertFailedFailbackPlanEffects();
+        }
+    }
+
+    @Override
+    public synchronized void process(CompleteFailbackResponseMessage response) throws HyracksDataException {
+        /**
+         * the failback plan completed successfully:
+         * Remove all references to it.
+         * Remove the the failing back node from the failed nodes list.
+         * Notify its replicas to reconnect to it.
+         * Set the failing back node partitions as active.
+         */
+        NodeFailbackPlan plan = planId2FailbackPlanMap.remove(response.getPlanId());
+        String nodeId = plan.getNodeId();
+        failedNodes.remove(nodeId);
+        //notify impacted replicas they can reconnect to this node
+        notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN);
+        clusterManager.updateNodePartitions(nodeId, true);
+        validateClusterState();
+    }
+
+    private synchronized void requestMetadataNodeTakeover() {
+        //need a new node to takeover metadata node
+        ClusterPartition metadataPartiton = AppContextInfo.INSTANCE.getMetadataProperties().getMetadataPartition();
+        //request the metadataPartition node to register itself as the metadata node
+        TakeoverMetadataNodeRequestMessage takeoverRequest = new TakeoverMetadataNodeRequestMessage();
+        try {
+            messageBroker.sendApplicationMessageToNC(takeoverRequest, metadataPartiton.getActiveNodeId());
+        } catch (Exception e) {
+            /**
+             * if we fail to send the request, it means the NC we tried to send the request to
+             * has failed. When the failure notification arrives, a new NC will be assigned to
+             * the metadata partition and a new metadata node takeover request will be sent to it.
+             */
+            LOGGER.log(Level.WARNING,
+                    "Failed to send metadata node takeover request to: " + metadataPartiton.getActiveNodeId(), e);
+        }
+    }
+
+    @Override
+    public IFaultToleranceStrategy from(IReplicationStrategy replicationStrategy, IClusterStateManager clusterManager,
+            ICCMessageBroker messageBroker) {
+        AutoFaultToleranceStrategy ft = new AutoFaultToleranceStrategy();
+        ft.clusterManager = clusterManager;
+        ft.messageBroker = messageBroker;
+        ft.replicationStrategy = replicationStrategy;
+        ft.currentMetadataNode = clusterManager.getCurrentMetadataNodeId();
+        ft.metadataNodeActive = false;
+        return ft;
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
new file mode 100644
index 0000000..d08f8ff
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.replication;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.common.replication.Replica;
+import org.apache.asterix.runtime.message.ReplayPartitionLogsRequestMessage;
+import org.apache.asterix.runtime.message.ReplayPartitionLogsResponseMessage;
+import org.apache.asterix.runtime.replication.IFaultToleranceStrategy;
+import org.apache.asterix.runtime.util.AppContextInfo;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class MetadataNodeFaultToleranceStrategy implements IFaultToleranceStrategy {
+
+    private static final Logger LOGGER = Logger.getLogger(MetadataNodeFaultToleranceStrategy.class.getName());
+    private IClusterStateManager clusterManager;
+    private String currentMetadataNodeId;
+    private IReplicationStrategy replicationStrategy;
+    private ICCMessageBroker messageBroker;
+    private int requestId = 0;
+    private Map<Integer, ReplayPartitionLogsRequestMessage> pendingReplayLogs = new HashMap<>();
+    private final Set<String> standbyMetadataReplica = new HashSet<>();
+    private final Set<String> failedNodes = new HashSet<>();
+
+    @Override
+    public synchronized void notifyNodeJoin(String nodeId) throws HyracksDataException {
+        //TODO if a rejoining node is the metadata node, ask it to recover from a standby node.
+        //TODO if a rejoining node is a metadata replica, it needs to take a copy of the metadata again.
+        failedNodes.remove(nodeId);
+        clusterManager.updateNodePartitions(nodeId, true);
+        if (nodeId.equals(currentMetadataNodeId)) {
+            clusterManager.updateMetadataNode(currentMetadataNodeId, true);
+        }
+        clusterManager.updateClusterState();
+    }
+
+    @Override
+    public synchronized void notifyNodeFailure(String nodeId) throws HyracksDataException {
+        failedNodes.add(nodeId);
+        standbyMetadataReplica.remove(nodeId);
+        clusterManager.updateNodePartitions(nodeId, false);
+        if (nodeId.equals(currentMetadataNodeId)) {
+            clusterManager.updateMetadataNode(currentMetadataNodeId, false);
+        }
+        clusterManager.updateClusterState();
+        // If the failed node is the metadata node, ask its replicas to replay any committed jobs
+        if (nodeId.equals(currentMetadataNodeId)) {
+            int metadataPartitionId = AppContextInfo.INSTANCE.getMetadataProperties().getMetadataPartition()
+                    .getPartitionId();
+            Set<Integer> metadataPartition = new HashSet<>(Arrays.asList(metadataPartitionId));
+            Set<Replica> activeRemoteReplicas = replicationStrategy.getRemoteReplicas(currentMetadataNodeId).stream()
+                    .filter(replica -> !failedNodes.contains(replica.getId())).collect(Collectors.toSet());
+            //TODO Do election to identity the node with latest state
+            for (Replica replica : activeRemoteReplicas) {
+                int currentRequestId = requestId++;
+                ReplayPartitionLogsRequestMessage msg = new ReplayPartitionLogsRequestMessage(currentRequestId,
+                        metadataPartition);
+                try {
+                    messageBroker.sendApplicationMessageToNC(msg, replica.getId());
+                } catch (Exception e) {
+                    LOGGER.log(Level.WARNING, "Failed sending an application message to an NC", e);
+                    continue;
+                }
+                pendingReplayLogs.put(currentRequestId, msg);
+            }
+        }
+    }
+
+    @Override
+    public synchronized void process(ReplayPartitionLogsResponseMessage msg) throws HyracksDataException {
+        pendingReplayLogs.remove(msg.getRequestId());
+        standbyMetadataReplica.add(msg.getNodeId());
+        LOGGER.log(Level.INFO, "Standby Metadata Nodes: " + standbyMetadataReplica);
+    }
+
+    @Override
+    public IFaultToleranceStrategy from(IReplicationStrategy replicationStrategy, IClusterStateManager clusterManager,
+            ICCMessageBroker messageBroker) {
+        MetadataNodeFaultToleranceStrategy ft = new MetadataNodeFaultToleranceStrategy();
+        ft.currentMetadataNodeId = clusterManager.getCurrentMetadataNodeId();
+        ft.clusterManager = clusterManager;
+        ft.replicationStrategy = replicationStrategy;
+        ft.messageBroker = messageBroker;
+        return ft;
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java
new file mode 100644
index 0000000..d22d253
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.replication;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+
+import org.apache.asterix.app.nc.task.BindMetadataNodeTask;
+import org.apache.asterix.app.nc.task.CheckpointTask;
+import org.apache.asterix.app.nc.task.ExternalLibrarySetupTask;
+import org.apache.asterix.app.nc.task.LocalRecoveryTask;
+import org.apache.asterix.app.nc.task.MetadataBootstrapTask;
+import org.apache.asterix.app.nc.task.ReportMaxResourceIdTask;
+import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask;
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
+import org.apache.asterix.runtime.message.NCLifecycleTaskReportMessage;
+import org.apache.asterix.runtime.message.StartupTaskRequestMessage;
+import org.apache.asterix.runtime.message.StartupTaskResponseMessage;
+import org.apache.asterix.runtime.replication.IFaultToleranceStrategy;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class NoFaultToleranceStrategy implements IFaultToleranceStrategy {
+
+    private static final Logger LOGGER = Logger.getLogger(NoFaultToleranceStrategy.class.getName());
+    IClusterStateManager clusterManager;
+    private String metadataNodeId;
+    private ICCMessageBroker messageBroker;
+    private Set<String> pendingStartupCompletioNodes = new HashSet<>();
+
+    @Override
+    public void notifyNodeJoin(String nodeId) throws HyracksDataException {
+        pendingStartupCompletioNodes.add(nodeId);
+    }
+
+    @Override
+    public void notifyNodeFailure(String nodeId) throws HyracksDataException {
+        pendingStartupCompletioNodes.remove(nodeId);
+        clusterManager.updateNodePartitions(nodeId, false);
+        if (nodeId.equals(metadataNodeId)) {
+            clusterManager.updateMetadataNode(metadataNodeId, false);
+        }
+        clusterManager.updateClusterState();
+    }
+
+    @Override
+    public IFaultToleranceStrategy from(IReplicationStrategy replicationStrategy, IClusterStateManager clusterManager,
+            ICCMessageBroker messageBroker) {
+        NoFaultToleranceStrategy ft = new NoFaultToleranceStrategy();
+        ft.clusterManager = clusterManager;
+        ft.metadataNodeId = clusterManager.getCurrentMetadataNodeId();
+        ft.messageBroker = messageBroker;
+        return ft;
+    }
+
+    @Override
+    public void process(StartupTaskRequestMessage msg) throws HyracksDataException {
+        final String nodeId = msg.getNodeId();
+        List<INCLifecycleTask> tasks = buildNCStartupSequence(msg.getNodeId(), msg.getState());
+        StartupTaskResponseMessage response = new StartupTaskResponseMessage(0, nodeId, tasks);
+        try {
+            messageBroker.sendApplicationMessageToNC(response, msg.getNodeId());
+        } catch (Exception e) {
+            throw ExceptionUtils.convertToHyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void process(NCLifecycleTaskReportMessage msg) throws HyracksDataException {
+        pendingStartupCompletioNodes.remove(msg.getNodeId());
+        if (msg.isSuccess()) {
+            clusterManager.updateNodePartitions(msg.getNodeId(), true);
+            if (msg.getNodeId().equals(metadataNodeId)) {
+                clusterManager.updateMetadataNode(metadataNodeId, true);
+            }
+            clusterManager.updateClusterState();
+        } else {
+            LOGGER.log(Level.SEVERE, msg.getNodeId() + " failed to complete startup. ", msg.getException());
+        }
+    }
+
+    private List<INCLifecycleTask> buildNCStartupSequence(String nodeId, SystemState state) {
+        final List<INCLifecycleTask> tasks = new ArrayList<>();
+        if (state == SystemState.CORRUPTED) {
+            //need to perform local recovery for node partitions
+            LocalRecoveryTask rt = new LocalRecoveryTask(Arrays.asList(clusterManager.getNodePartitions(nodeId))
+                    .stream().map(ClusterPartition::getPartitionId).collect(Collectors.toSet()));
+            tasks.add(rt);
+        }
+        final boolean isMetadataNode = nodeId.equals(metadataNodeId);
+        if (isMetadataNode) {
+            tasks.add(new MetadataBootstrapTask());
+        }
+        tasks.add(new ExternalLibrarySetupTask(isMetadataNode));
+        tasks.add(new ReportMaxResourceIdTask());
+        tasks.add(new CheckpointTask());
+        tasks.add(new StartLifecycleComponentsTask());
+        if (isMetadataNode) {
+            tasks.add(new BindMetadataNodeTask(true));
+        }
+        return tasks;
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 8998c6b..9775329 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -20,24 +20,19 @@
 
 import java.io.File;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.app.external.ExternalLibraryUtils;
 import org.apache.asterix.app.nc.NCAppRuntimeContext;
 import org.apache.asterix.common.api.AsterixThreadFactory;
 import org.apache.asterix.common.api.IAppRuntimeContext;
 import org.apache.asterix.common.config.AsterixExtension;
-import org.apache.asterix.common.config.MetadataProperties;
-import org.apache.asterix.common.config.TransactionProperties;
 import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.config.IPropertiesProvider;
 import org.apache.asterix.common.config.MessagingProperties;
-import org.apache.asterix.common.replication.IRemoteRecoveryManager;
-import org.apache.asterix.common.transactions.ICheckpointManager;
+import org.apache.asterix.common.config.MetadataProperties;
+import org.apache.asterix.common.config.TransactionProperties;
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
 import org.apache.asterix.common.utils.PrintUtil;
@@ -46,13 +41,11 @@
 import org.apache.asterix.event.schema.cluster.Node;
 import org.apache.asterix.messaging.MessagingChannelInterfaceFactory;
 import org.apache.asterix.messaging.NCMessageBroker;
-import org.apache.asterix.runtime.message.ReportMaxResourceIdMessage;
+import org.apache.asterix.runtime.message.StartupTaskRequestMessage;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.application.INCApplicationContext;
 import org.apache.hyracks.api.application.INCApplicationEntryPoint;
-import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
-import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
 import org.apache.hyracks.api.messages.IMessageBroker;
 import org.apache.hyracks.control.nc.NodeControllerService;
 import org.kohsuke.args4j.CmdLineException;
@@ -62,13 +55,11 @@
 public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
     private static final Logger LOGGER = Logger.getLogger(NCApplicationEntryPoint.class.getName());
 
-    @Option(name = "-initial-run",
-            usage = "A flag indicating if it's the first time the NC is started (default: false)", required = false)
+    @Option(name = "-initial-run", usage = "A flag indicating if it's the first time the NC is started (default: false)", required = false)
     public boolean initialRun = false;
 
-    @Option(name = "-virtual-NC",
-            usage = "A flag indicating if this NC is running on virtual cluster " + "(default: false)",
-            required = false)
+    @Option(name = "-virtual-NC", usage = "A flag indicating if this NC is running on virtual cluster "
+            + "(default: false)", required = false)
     public boolean virtualNC = false;
 
     private INCApplicationContext ncApplicationContext = null;
@@ -91,8 +82,8 @@
             parser.printUsage(System.err);
             throw e;
         }
-        ncAppCtx.setThreadFactory(new AsterixThreadFactory(ncAppCtx.getThreadFactory(),
-                ncAppCtx.getLifeCycleComponentManager()));
+        ncAppCtx.setThreadFactory(
+                new AsterixThreadFactory(ncAppCtx.getThreadFactory(), ncAppCtx.getLifeCycleComponentManager()));
         ncApplicationContext = ncAppCtx;
         nodeId = ncApplicationContext.getNodeId();
         if (LOGGER.isLoggable(Level.INFO)) {
@@ -102,81 +93,47 @@
         final NodeControllerService controllerService = (NodeControllerService) ncAppCtx.getControllerService();
 
         if (System.getProperty("java.rmi.server.hostname") == null) {
-            System.setProperty("java.rmi.server.hostname", (controllerService)
-                    .getConfiguration().clusterNetPublicIPAddress);
+            System.setProperty("java.rmi.server.hostname",
+                    (controllerService).getConfiguration().clusterNetPublicIPAddress);
         }
-        runtimeContext = new NCAppRuntimeContext(ncApplicationContext, getExtensions());
-        MetadataProperties metadataProperties = ((IPropertiesProvider) runtimeContext)
-                .getMetadataProperties();
+        runtimeContext = new NCAppRuntimeContext(ncApplicationContext, getExtensions(), initialRun);
+        MetadataProperties metadataProperties = ((IPropertiesProvider) runtimeContext).getMetadataProperties();
         if (!metadataProperties.getNodeNames().contains(ncApplicationContext.getNodeId())) {
             if (LOGGER.isLoggable(Level.INFO)) {
                 LOGGER.info("Substitute node joining : " + ncApplicationContext.getNodeId());
             }
             updateOnNodeJoin();
         }
-        runtimeContext.initialize(initialRun);
+        runtimeContext.initialize();
         ncApplicationContext.setApplicationObject(runtimeContext);
-        MessagingProperties messagingProperties = ((IPropertiesProvider) runtimeContext)
-                .getMessagingProperties();
+        MessagingProperties messagingProperties = ((IPropertiesProvider) runtimeContext).getMessagingProperties();
         messageBroker = new NCMessageBroker(controllerService, messagingProperties);
         ncApplicationContext.setMessageBroker(messageBroker);
         MessagingChannelInterfaceFactory interfaceFactory = new MessagingChannelInterfaceFactory(
                 (NCMessageBroker) messageBroker, messagingProperties);
         ncApplicationContext.setMessagingChannelInterfaceFactory(interfaceFactory);
 
-        boolean replicationEnabled = ClusterProperties.INSTANCE.isReplicationEnabled();
-        boolean autoFailover = ClusterProperties.INSTANCE.isAutoFailoverEnabled();
-        if (initialRun) {
-            LOGGER.info("System is being initialized. (first run)");
-        } else {
-            IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
-            systemState = recoveryMgr.getSystemState();
+        IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
+        systemState = recoveryMgr.getSystemState();
 
+        if (initialRun || systemState == SystemState.NEW_UNIVERSE) {
             if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("System is in a state: " + systemState);
+                LOGGER.info("System state: " + SystemState.NEW_UNIVERSE);
+                LOGGER.info("Node ID: " + nodeId);
+                LOGGER.info("Stores: " + PrintUtil.toString(metadataProperties.getStores()));
+                LOGGER.info("Root Metadata Store: " + metadataProperties.getStores().get(nodeId)[0]);
             }
 
-            //do not attempt to perform remote recovery if this is a virtual NC
-            if (autoFailover && !virtualNC) {
-                if (systemState == SystemState.NEW_UNIVERSE || systemState == SystemState.CORRUPTED) {
-                    //Start failback process
-                    IRemoteRecoveryManager remoteRecoveryMgr = runtimeContext.getRemoteRecoveryManager();
-                    remoteRecoveryMgr.startFailbackProcess();
-                    systemState = SystemState.RECOVERING;
-                    pendingFailbackCompletion = true;
-                }
-            } else {
-                //recover if the system is corrupted by checking system state.
-                if (systemState == SystemState.CORRUPTED) {
-                    recoveryMgr.startRecovery(true);
-                }
-            }
+            PersistentLocalResourceRepository localResourceRepository = (PersistentLocalResourceRepository) runtimeContext
+                    .getLocalResourceRepository();
+            localResourceRepository.initializeNewUniverse(ClusterProperties.INSTANCE.getStorageDirectoryName());
         }
 
-        /**
-         * if the node pending failback completion, the replication channel
-         * should not be opened to avoid other nodes connecting to it before
-         * the node completes its failback. CC will notify other replicas once
-         * this node is ready to receive replication requests.
-         */
-        if (replicationEnabled && !pendingFailbackCompletion) {
-            startReplicationService();
-        }
+        performLocalCleanUp();
     }
 
     protected List<AsterixExtension> getExtensions() {
         return Collections.emptyList();
-    }
-
-    private void startReplicationService() throws InterruptedException {
-        //Open replication channel
-        runtimeContext.getReplicationChannel().start();
-
-        //Check the state of remote replicas
-        runtimeContext.getReplicationManager().initializeReplicasState();
-
-        //Start replication after the state of remote replicas has been initialized.
-        runtimeContext.getReplicationManager().startReplicationThreads();
     }
 
     @Override
@@ -202,63 +159,8 @@
 
     @Override
     public void notifyStartupComplete() throws Exception {
-        //Send max resource id on this NC to the CC
-        ReportMaxResourceIdMessage.send((NodeControllerService) ncApplicationContext.getControllerService());
-        MetadataProperties metadataProperties = ((IPropertiesProvider) runtimeContext)
-                .getMetadataProperties();
-        if (initialRun || systemState == SystemState.NEW_UNIVERSE) {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("System state: " + SystemState.NEW_UNIVERSE);
-                LOGGER.info("Node ID: " + nodeId);
-                LOGGER.info("Stores: " + PrintUtil.toString(metadataProperties.getStores()));
-                LOGGER.info("Root Metadata Store: " + metadataProperties.getStores().get(nodeId)[0]);
-            }
-
-            PersistentLocalResourceRepository localResourceRepository =
-                    (PersistentLocalResourceRepository) runtimeContext
-                            .getLocalResourceRepository();
-            localResourceRepository.initializeNewUniverse(ClusterProperties.INSTANCE.getStorageDirectoryName());
-        }
-
-        isMetadataNode = nodeId.equals(metadataProperties.getMetadataNodeName());
-        if (isMetadataNode && !pendingFailbackCompletion) {
-            runtimeContext.initializeMetadata(systemState == SystemState.NEW_UNIVERSE);
-        }
-        ExternalLibraryUtils.setUpExternaLibraries(runtimeContext.getLibraryManager(),
-                isMetadataNode && !pendingFailbackCompletion);
-
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Starting lifecycle components");
-        }
-
-        Map<String, String> lifecycleMgmtConfiguration = new HashMap<String, String>();
-        String dumpPathKey = LifeCycleComponentManager.Config.DUMP_PATH_KEY;
-        String dumpPath = metadataProperties.getCoredumpPath(nodeId);
-        lifecycleMgmtConfiguration.put(dumpPathKey, dumpPath);
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Coredump directory for NC is: " + dumpPath);
-        }
-        ILifeCycleComponentManager lccm = ncApplicationContext.getLifeCycleComponentManager();
-        lccm.configure(lifecycleMgmtConfiguration);
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Configured:" + lccm);
-        }
-        ncApplicationContext.setStateDumpHandler(
-                new AsterixStateDumpHandler(ncApplicationContext.getNodeId(), lccm.getDumpPath(), lccm));
-
-        lccm.startAll();
-
-        if (!pendingFailbackCompletion) {
-            ICheckpointManager checkpointMgr = runtimeContext.getTransactionSubsystem().getCheckpointManager();
-            checkpointMgr.doSharpCheckpoint();
-
-            if (isMetadataNode) {
-                runtimeContext.exportMetadataNodeStub();
-            }
-        }
-
-        //Clean any temporary files
-        performLocalCleanUp();
+        // Request startup tasks from CC
+        StartupTaskRequestMessage.send((NodeControllerService) ncApplicationContext.getControllerService());
     }
 
     private void performLocalCleanUp() {
@@ -281,8 +183,7 @@
     }
 
     private void updateOnNodeJoin() {
-        MetadataProperties metadataProperties = ((IPropertiesProvider) runtimeContext)
-                .getMetadataProperties();
+        MetadataProperties metadataProperties = ((IPropertiesProvider) runtimeContext).getMetadataProperties();
         if (!metadataProperties.getNodeNames().contains(nodeId)) {
             metadataProperties.getNodeNames().add(nodeId);
             Cluster cluster = ClusterProperties.INSTANCE.getCluster();
@@ -290,8 +191,7 @@
                 throw new IllegalStateException("No cluster configuration found for this instance");
             }
             String asterixInstanceName = metadataProperties.getInstanceName();
-            TransactionProperties txnProperties = ((IPropertiesProvider) runtimeContext)
-                    .getTransactionProperties();
+            TransactionProperties txnProperties = ((IPropertiesProvider) runtimeContext).getTransactionProperties();
             Node self = null;
             List<Node> nodes;
             if (cluster.getSubstituteNodes() != null) {
@@ -334,4 +234,4 @@
             }
         }
     }
-}
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/resources/cluster.xml b/asterixdb/asterix-app/src/main/resources/cluster.xml
index 8f0b694..26697ca 100644
--- a/asterixdb/asterix-app/src/main/resources/cluster.xml
+++ b/asterixdb/asterix-app/src/main/resources/cluster.xml
@@ -19,14 +19,23 @@
 <cluster xmlns="cluster">
   <instance_name>asterix</instance_name>
   <store>storage</store>
+  <metadata_node>nc1</metadata_node>
 
-  <data_replication>
+  <high_availability>
     <enabled>false</enabled>
-    <replication_port>2016</replication_port>
-    <replication_factor>2</replication_factor>
-    <auto_failover>false</auto_failover>
-    <replication_time_out>30</replication_time_out>
-  </data_replication>
+    <data_replication>
+      <strategy>metadata_only</strategy>
+      <replication_port>2016</replication_port>
+      <replication_factor>2</replication_factor>
+      <replication_time_out>30</replication_time_out>
+    </data_replication>
+    <fault_tolerance>
+       <strategy>metadata_node</strategy>
+       <replica>
+         <node_id>nc2</node_id>
+       </replica>
+    </fault_tolerance>
+  </high_availability>
 
   <master_node>
     <id>master</id>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/replication/replication.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/replication/replication.1.adm
index 8adc10c..f96b108 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/replication/replication.1.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/replication/replication.1.adm
@@ -1,6 +1,4 @@
 {"config": {
-    "enabled": false,
-    "factor": 2,
     "log.batchsize": 4096,
     "log.buffer.numpages": 8,
     "log.buffer.pagesize": 131072,
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAppRuntimeContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAppRuntimeContext.java
index 7bc9421..750e752 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAppRuntimeContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAppRuntimeContext.java
@@ -66,7 +66,7 @@
 
     public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID);
 
-    public void initialize(boolean initialRun) throws IOException, ACIDException, AsterixException;
+    public void initialize() throws IOException, ACIDException, AsterixException;
 
     public void setShuttingdown(boolean b);
 
@@ -107,4 +107,9 @@
      * @throws RemoteException
      */
     public void unexportMetadataNodeStub() throws RemoteException;
+
+    /**
+     * @return True if the {@link IAppRuntimeContext} is being initialized for the first time.
+     */
+    boolean isInitialRun();
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INCLifecycleTask.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INCLifecycleTask.java
new file mode 100644
index 0000000..f40e127
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INCLifecycleTask.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.api;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+
+@FunctionalInterface
+public interface INCLifecycleTask extends Serializable {
+
+    void start(IControllerService cs) throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
new file mode 100644
index 0000000..003bae2
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.cluster;
+
+import java.util.Map;
+
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IClusterStateManager {
+
+    /**
+     * @return The current cluster state.
+     */
+    ClusterState getState();
+
+    /**
+     * Updates the cluster state based on the state of all cluster partitions and the metadata node.
+     */
+    void updateClusterState() throws HyracksDataException;
+
+    /**
+     * Force the cluster state into {@code state}
+     */
+    void forceIntoState(ClusterState state);
+
+    /**
+     * Updates all partitions of {@code nodeId} based on the {@code active} flag.
+     * @param nodeId
+     * @param active
+     * @throws HyracksDataException
+     */
+    void updateNodePartitions(String nodeId, boolean active) throws HyracksDataException;
+
+    /**
+     * Updates the active node and active state of the cluster partition with id {@code partitionNum}
+     */
+    void updateClusterPartition(Integer partitionNum, String activeNode, boolean active);
+
+    /**
+     * Updates the metadata node id and its state.
+     */
+    void updateMetadataNode(String nodeId, boolean active);
+
+    /**
+     * @return a map of nodeId and NC Configuration for active nodes.
+     */
+    Map<String, Map<String, String>> getActiveNcConfiguration();
+
+    /**
+     * @return The current metadata node Id.
+     */
+    String getCurrentMetadataNodeId();
+
+    ClusterPartition[] getNodePartitions(String nodeId);
+
+    ClusterPartition[] getClusterPartitons();
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java
index 81c5a6d..5921b09 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java
@@ -19,21 +19,27 @@
 package org.apache.asterix.common.config;
 
 import java.io.InputStream;
+import java.util.Optional;
+import java.util.stream.Collectors;
 
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.JAXBException;
 import javax.xml.bind.Unmarshaller;
 
+import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.common.replication.ReplicationStrategyFactory;
 import org.apache.asterix.event.schema.cluster.Cluster;
+import org.apache.asterix.event.schema.cluster.Node;
+import org.apache.asterix.event.schema.cluster.Replica;
+import org.apache.commons.lang3.StringUtils;
 
 public class ClusterProperties {
 
     public static final ClusterProperties INSTANCE = new ClusterProperties();
-
     private static final String CLUSTER_CONFIGURATION_FILE = "cluster.xml";
     private static final String DEFAULT_STORAGE_DIR_NAME = "storage";
-
-    private final Cluster cluster;
+    private String nodeNamePrefix = StringUtils.EMPTY;
+    private Cluster cluster;
 
     private ClusterProperties() {
         InputStream is = this.getClass().getClassLoader().getResourceAsStream(CLUSTER_CONFIGURATION_FILE);
@@ -42,11 +48,11 @@
                 JAXBContext ctx = JAXBContext.newInstance(Cluster.class);
                 Unmarshaller unmarshaller = ctx.createUnmarshaller();
                 cluster = (Cluster) unmarshaller.unmarshal(is);
+                nodeNamePrefix = cluster.getInstanceName() + "_";
+                updateNodeIdToFullName();
             } catch (JAXBException e) {
                 throw new IllegalStateException("Failed to read configuration file " + CLUSTER_CONFIGURATION_FILE, e);
             }
-        } else {
-            cluster = null;
         }
     }
 
@@ -62,14 +68,46 @@
         return DEFAULT_STORAGE_DIR_NAME;
     }
 
-    public boolean isReplicationEnabled() {
-        if (cluster != null && cluster.getDataReplication() != null) {
-            return cluster.getDataReplication().isEnabled();
-        }
-        return false;
+    public boolean isAutoFailoverEnabled() {
+        //TODO make it strategy based
+        return true;
     }
 
-    public boolean isAutoFailoverEnabled() {
-        return isReplicationEnabled() && cluster.getDataReplication().isAutoFailover();
+    public Node getNodeById(String nodeId) {
+        Optional<Node> matchingNode = cluster.getNode().stream().filter(node -> node.getId().equals(nodeId)).findAny();
+        return matchingNode.isPresent() ? matchingNode.get() : null;
+    }
+
+    public int getNodeIndex(String nodeId) {
+        for (int i = 0; i < cluster.getNode().size(); i++) {
+            Node node = cluster.getNode().get(i);
+            if (node.getId().equals(nodeId)) {
+                return i;
+            }
+        }
+        return -1;
+    }
+
+    public IReplicationStrategy getReplicationStrategy() {
+        return ReplicationStrategyFactory.create(cluster);
+    }
+
+    private String getNodeFullName(String nodeId) {
+        if (nodeId.startsWith(nodeNamePrefix)) {
+            return nodeId;
+        }
+        return nodeNamePrefix + nodeId;
+    }
+
+    private void updateNodeIdToFullName() {
+        cluster.getNode().forEach(node -> node.setId(getNodeFullName(node.getId())));
+        if (cluster.getMetadataNode() != null) {
+            cluster.setMetadataNode(getNodeFullName(cluster.getMetadataNode()));
+        }
+        if (cluster.getHighAvailability() != null && cluster.getHighAvailability().getFaultTolerance() != null
+                && cluster.getHighAvailability().getFaultTolerance().getReplica() != null) {
+            Replica replicas = cluster.getHighAvailability().getFaultTolerance().getReplica();
+            replicas.setNodeId(replicas.getNodeId().stream().map(this::getNodeFullName).collect(Collectors.toList()));
+        }
     }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ReplicationProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ReplicationProperties.java
index 164a525..cf2ce4f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ReplicationProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ReplicationProperties.java
@@ -18,11 +18,10 @@
  */
 package org.apache.asterix.common.config;
 
-import java.util.HashSet;
 import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
+import java.util.stream.Collectors;
 
+import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.replication.Replica;
 import org.apache.asterix.event.schema.cluster.Cluster;
 import org.apache.asterix.event.schema.cluster.Node;
@@ -31,15 +30,7 @@
 
 public class ReplicationProperties extends AbstractProperties {
 
-    private static final Logger LOGGER = Logger.getLogger(ReplicationProperties.class.getName());
-
-
     private static final int REPLICATION_DATAPORT_DEFAULT = 2000;
-
-    private static final String REPLICATION_ENABLED_KEY = "replication.enabled";
-
-    private static final String REPLICATION_FACTOR_KEY = "replication.factor";
-    private static final int REPLICATION_FACTOR_DEFAULT = 1;
 
     private static final String REPLICATION_TIMEOUT_KEY = "replication.timeout";
     private static final int REPLICATION_TIME_OUT_DEFAULT = 15;
@@ -60,203 +51,58 @@
     private static final int REPLICATION_LOG_BUFFER_PAGE_SIZE_DEFAULT = StorageUtil.getSizeInBytes(128,
             StorageUnit.KILOBYTE);
 
-    private final String nodeNamePrefix;
     private final Cluster cluster;
+    private final IReplicationStrategy repStrategy;
 
     public ReplicationProperties(PropertiesAccessor accessor) {
         super(accessor);
         this.cluster = ClusterProperties.INSTANCE.getCluster();
-
-        if (cluster != null) {
-            nodeNamePrefix = cluster.getInstanceName() + "_";
-        } else {
-            nodeNamePrefix = "";
-        }
-    }
-
-    @PropertyKey(REPLICATION_ENABLED_KEY)
-    public boolean isReplicationEnabled() {
-        return ClusterProperties.INSTANCE.isReplicationEnabled();
+        this.repStrategy = ClusterProperties.INSTANCE.getReplicationStrategy();
     }
 
     public String getReplicaIPAddress(String nodeId) {
-        if (cluster != null) {
-            for (int i = 0; i < cluster.getNode().size(); i++) {
-                Node node = cluster.getNode().get(i);
-                if (getRealCluserNodeID(node.getId()).equals(nodeId)) {
-                    return node.getClusterIp();
-                }
-            }
-        }
-        return NODE_IP_ADDRESS_DEFAULT;
+        Node node = ClusterProperties.INSTANCE.getNodeById(nodeId);
+        return node != null ? node.getClusterIp() : NODE_IP_ADDRESS_DEFAULT;
     }
 
     public int getDataReplicationPort(String nodeId) {
-        if (cluster != null && cluster.getDataReplication() != null) {
-            for (int i = 0; i < cluster.getNode().size(); i++) {
-                Node node = cluster.getNode().get(i);
-                if (getRealCluserNodeID(node.getId()).equals(nodeId)) {
-                    return node.getReplicationPort() != null ? node.getReplicationPort().intValue()
-                            : cluster.getDataReplication().getReplicationPort().intValue();
-                }
-            }
+        Node node = ClusterProperties.INSTANCE.getNodeById(nodeId);
+        if (node != null) {
+            return node.getReplicationPort() != null ? node.getReplicationPort().intValue()
+                    : cluster.getHighAvailability().getDataReplication().getReplicationPort().intValue();
         }
         return REPLICATION_DATAPORT_DEFAULT;
     }
 
-    public Set<Replica> getRemoteReplicas(String nodeId) {
-        Set<Replica> remoteReplicas = new HashSet<>();;
-
-        int numberOfRemoteReplicas = getReplicationFactor() - 1;
-        //Using chained-declustering
-        if (cluster != null) {
-            int nodeIndex = -1;
-            //find the node index in the cluster config
-            for (int i = 0; i < cluster.getNode().size(); i++) {
-                Node node = cluster.getNode().get(i);
-                if (getRealCluserNodeID(node.getId()).equals(nodeId)) {
-                    nodeIndex = i;
-                    break;
-                }
-            }
-
-            if (nodeIndex == -1) {
-                LOGGER.log(Level.WARNING,
-                        "Could not find node " + getRealCluserNodeID(nodeId) + " in cluster configurations");
-                return null;
-            }
-
-            //find nodes to the right of this node
-            for (int i = nodeIndex + 1; i < cluster.getNode().size(); i++) {
-                remoteReplicas.add(getReplicaByNodeIndex(i));
-                if (remoteReplicas.size() == numberOfRemoteReplicas) {
-                    break;
-                }
-            }
-
-            //if not all remote replicas have been found, start from the beginning
-            if (remoteReplicas.size() != numberOfRemoteReplicas) {
-                for (int i = 0; i < cluster.getNode().size(); i++) {
-                    remoteReplicas.add(getReplicaByNodeIndex(i));
-                    if (remoteReplicas.size() == numberOfRemoteReplicas) {
-                        break;
-                    }
-                }
-            }
-        }
-        return remoteReplicas;
-    }
-
-    private Replica getReplicaByNodeIndex(int nodeIndex) {
-        Node node = cluster.getNode().get(nodeIndex);
-        Node replicaNode = new Node();
-        replicaNode.setId(getRealCluserNodeID(node.getId()));
-        replicaNode.setClusterIp(node.getClusterIp());
-        return new Replica(replicaNode);
-    }
-
     public Replica getReplicaById(String nodeId) {
-        int nodeIndex = -1;
-        if (cluster != null) {
-            for (int i = 0; i < cluster.getNode().size(); i++) {
-                Node node = cluster.getNode().get(i);
-
-                if (getRealCluserNodeID(node.getId()).equals(nodeId)) {
-                    nodeIndex = i;
-                    break;
-                }
-            }
+        Node node = ClusterProperties.INSTANCE.getNodeById(nodeId);
+        if (node != null) {
+            return new Replica(node);
         }
-
-        if (nodeIndex < 0) {
-            return null;
-        }
-
-        return getReplicaByNodeIndex(nodeIndex);
+        return null;
     }
 
     public Set<String> getRemoteReplicasIds(String nodeId) {
-        Set<String> remoteReplicasIds = new HashSet<>();
-        Set<Replica> remoteReplicas = getRemoteReplicas(nodeId);
-
-        for (Replica replica : remoteReplicas) {
-            remoteReplicasIds.add(replica.getId());
-        }
-
-        return remoteReplicasIds;
+        return repStrategy.getRemoteReplicas(nodeId).stream().map(Replica::getId).collect(Collectors.toSet());
     }
 
-    public String getRealCluserNodeID(String nodeId) {
-        return nodeNamePrefix + nodeId;
+    public Set<String> getRemotePrimaryReplicasIds(String nodeId) {
+        return repStrategy.getRemotePrimaryReplicas(nodeId).stream().map(Replica::getId).collect(Collectors.toSet());
     }
 
     public Set<String> getNodeReplicasIds(String nodeId) {
-        Set<String> replicaIds = new HashSet<>();
-        replicaIds.add(nodeId);
-        replicaIds.addAll(getRemoteReplicasIds(nodeId));
-        return replicaIds;
-    }
-
-    @PropertyKey(REPLICATION_FACTOR_KEY)
-    public int getReplicationFactor() {
-        if (cluster != null) {
-            if (cluster.getDataReplication() == null || cluster.getDataReplication().getReplicationFactor() == null) {
-                return REPLICATION_FACTOR_DEFAULT;
-            }
-            return cluster.getDataReplication().getReplicationFactor().intValue();
-        }
-        return REPLICATION_FACTOR_DEFAULT;
+        Set<String> remoteReplicasIds = getRemoteReplicasIds(nodeId);
+        // This includes the node itself
+        remoteReplicasIds.add(nodeId);
+        return remoteReplicasIds;
     }
 
     @PropertyKey(REPLICATION_TIMEOUT_KEY)
     public int getReplicationTimeOut() {
         if (cluster != null) {
-            return cluster.getDataReplication().getReplicationTimeOut().intValue();
+            return cluster.getHighAvailability().getDataReplication().getReplicationTimeOut().intValue();
         }
         return REPLICATION_TIME_OUT_DEFAULT;
-    }
-
-    /**
-     * @param nodeId
-     * @return The set of nodes which replicate to this node, including the node itself
-     */
-    public Set<String> getNodeReplicationClients(String nodeId) {
-        Set<String> clientReplicas = new HashSet<>();
-        clientReplicas.add(nodeId);
-
-        int clientsCount = getReplicationFactor();
-
-        //Using chained-declustering backwards
-        if (cluster != null) {
-            int nodeIndex = -1;
-            //find the node index in the cluster config
-            for (int i = 0; i < cluster.getNode().size(); i++) {
-                Node node = cluster.getNode().get(i);
-                if (getRealCluserNodeID(node.getId()).equals(nodeId)) {
-                    nodeIndex = i;
-                    break;
-                }
-            }
-
-            //find nodes to the left of this node
-            for (int i = nodeIndex - 1; i >= 0; i--) {
-                clientReplicas.add(getReplicaByNodeIndex(i).getId());
-                if (clientReplicas.size() == clientsCount) {
-                    break;
-                }
-            }
-
-            //if not all client replicas have been found, start from the end
-            if (clientReplicas.size() != clientsCount) {
-                for (int i = cluster.getNode().size() - 1; i >= 0; i--) {
-                    clientReplicas.add(getReplicaByNodeIndex(i).getId());
-                    if (clientReplicas.size() == clientsCount) {
-                        break;
-                    }
-                }
-            }
-        }
-        return clientReplicas;
     }
 
     @PropertyKey(REPLICATION_MAX_REMOTE_RECOVERY_ATTEMPTS_KEY)
@@ -281,4 +127,12 @@
         return accessor.getProperty(REPLICATION_LOG_BATCH_SIZE_KEY, REPLICATION_LOG_BATCH_SIZE_DEFAULT,
                 PropertyInterpreters.getIntegerBytePropertyInterpreter());
     }
-}
+
+    public boolean isParticipant(String nodeId) {
+        return repStrategy.isParticipant(nodeId);
+    }
+
+    public IReplicationStrategy getReplicationStrategy() {
+        return repStrategy;
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ChainedDeclusteringReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ChainedDeclusteringReplicationStrategy.java
new file mode 100644
index 0000000..ad326b2
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ChainedDeclusteringReplicationStrategy.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.config.ClusterProperties;
+import org.apache.asterix.event.schema.cluster.Cluster;
+
+public class ChainedDeclusteringReplicationStrategy implements IReplicationStrategy {
+
+    private static final Logger LOGGER = Logger.getLogger(ChainedDeclusteringReplicationStrategy.class.getName());
+    private int replicationFactor;
+
+    @Override
+    public boolean isMatch(int datasetId) {
+        return true;
+    }
+
+    @Override
+    public Set<Replica> getRemoteReplicas(String nodeId) {
+        Set<Replica> remoteReplicas = new HashSet<>();
+        Cluster cluster = ClusterProperties.INSTANCE.getCluster();
+        int numberOfRemoteReplicas = replicationFactor - 1;
+        int nodeIndex = ClusterProperties.INSTANCE.getNodeIndex(nodeId);
+
+        if (nodeIndex == -1) {
+            LOGGER.log(Level.WARNING, "Could not find node " + nodeId + " in cluster configurations");
+            return Collections.emptySet();
+        }
+
+        //find nodes to the right of this node
+        while (remoteReplicas.size() != numberOfRemoteReplicas) {
+            remoteReplicas.add(new Replica(cluster.getNode().get(++nodeIndex % cluster.getNode().size())));
+        }
+
+        return remoteReplicas;
+    }
+
+    @Override
+    public Set<Replica> getRemotePrimaryReplicas(String nodeId) {
+        Set<Replica> clientReplicas = new HashSet<>();
+        Cluster cluster = ClusterProperties.INSTANCE.getCluster();
+        final int remotePrimaryReplicasCount = replicationFactor - 1;
+
+        int nodeIndex = ClusterProperties.INSTANCE.getNodeIndex(nodeId);
+
+        //find nodes to the left of this node
+        while (clientReplicas.size() != remotePrimaryReplicasCount) {
+            clientReplicas.add(new Replica(cluster.getNode().get(Math.abs(--nodeIndex % cluster.getNode().size()))));
+        }
+
+        return clientReplicas;
+    }
+
+    @Override
+    public ChainedDeclusteringReplicationStrategy from(Cluster cluster) {
+        if (cluster.getHighAvailability().getDataReplication().getReplicationFactor() == null) {
+            throw new IllegalStateException("Replication factor must be specified.");
+        }
+        ChainedDeclusteringReplicationStrategy cd = new ChainedDeclusteringReplicationStrategy();
+        cd.replicationFactor = cluster.getHighAvailability().getDataReplication().getReplicationFactor().intValue();
+        return cd;
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
index 9f9d74b..a593264 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
@@ -19,8 +19,10 @@
 package org.apache.asterix.common.replication;
 
 import java.io.IOException;
+import java.util.Set;
 
 import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface IRemoteRecoveryManager {
 
@@ -46,4 +48,6 @@
      * @throws InterruptedException
      */
     public void completeFailbackProcess() throws IOException, InterruptedException;
+
+    void replayReplicaPartitionLogs(Set<Integer> partitions, boolean flush) throws HyracksDataException;
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
new file mode 100644
index 0000000..f65f6ac
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.util.Set;
+
+import org.apache.asterix.event.schema.cluster.Cluster;
+
+public interface IReplicationStrategy {
+
+    /**
+     * @param datasetId
+     * @return True, if the dataset should be replicated. Otherwise false.
+     */
+    boolean isMatch(int datasetId);
+
+    /**
+     * @param nodeId
+     * @return The set of nodes that replicate data on {@code nodeId}.
+     */
+    Set<Replica> getRemotePrimaryReplicas(String nodeId);
+
+    /**
+     * @param node
+     * @return The set of nodes that {@code nodeId} replicates data to.
+     */
+    Set<Replica> getRemoteReplicas(String node);
+
+    /**
+     * @param nodeId
+     * @return true if {@code nodeId} has any remote primary replica or remote replica. Otherwise false.
+     */
+    default boolean isParticipant(String nodeId) {
+        return !getRemoteReplicas(nodeId).isEmpty() || !getRemotePrimaryReplicas(nodeId).isEmpty();
+    }
+
+    /**
+     * @param cluster
+     * @return A replication strategy based on the passed configurations.
+     */
+    IReplicationStrategy from(Cluster cluster);
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
new file mode 100644
index 0000000..711f06d
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.asterix.common.config.ClusterProperties;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
+import org.apache.asterix.event.schema.cluster.Cluster;
+import org.apache.asterix.event.schema.cluster.Node;
+
+public class MetadataOnlyReplicationStrategy implements IReplicationStrategy {
+
+    private String metadataNodeId;
+    private Replica metadataPrimaryReplica;
+    private Set<Replica> metadataNodeReplicas;
+
+    @Override
+    public boolean isMatch(int datasetId) {
+        return datasetId < MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID && datasetId >= 0;
+    }
+
+    @Override
+    public Set<Replica> getRemoteReplicas(String nodeId) {
+        if (nodeId.equals(metadataNodeId)) {
+            return metadataNodeReplicas;
+        }
+        return Collections.emptySet();
+    }
+
+    @Override
+    public Set<Replica> getRemotePrimaryReplicas(String nodeId) {
+        if (metadataNodeReplicas.stream().map(Replica::getId).filter(replicaId -> replicaId.equals(nodeId))
+                .count() != 0) {
+            return new HashSet<>(Arrays.asList(metadataPrimaryReplica));
+        }
+        return Collections.emptySet();
+    }
+
+    @Override
+    public MetadataOnlyReplicationStrategy from(Cluster cluster) {
+        if (cluster.getMetadataNode() == null) {
+            throw new IllegalStateException("Metadata node must be specified.");
+        }
+
+        Node metadataNode = ClusterProperties.INSTANCE.getNodeById(cluster.getMetadataNode());
+        if (metadataNode == null) {
+            throw new IllegalStateException("Invalid metadata node specified");
+        }
+
+        if (cluster.getHighAvailability().getFaultTolerance().getReplica() == null
+                || cluster.getHighAvailability().getFaultTolerance().getReplica().getNodeId() == null
+                || cluster.getHighAvailability().getFaultTolerance().getReplica().getNodeId().isEmpty()) {
+            throw new IllegalStateException("One or more replicas must be specified for metadata node.");
+        }
+
+        final Set<Replica> replicas = new HashSet<>();
+        for (String nodeId : cluster.getHighAvailability().getFaultTolerance().getReplica().getNodeId()) {
+            Node node = ClusterProperties.INSTANCE.getNodeById(nodeId);
+            if (node == null) {
+                throw new IllegalStateException("Invalid replica specified: " + nodeId);
+            }
+            replicas.add(new Replica(node));
+        }
+        MetadataOnlyReplicationStrategy st = new MetadataOnlyReplicationStrategy();
+        st.metadataNodeId = cluster.getMetadataNode();
+        st.metadataPrimaryReplica = new Replica(metadataNode);
+        st.metadataNodeReplicas = replicas;
+        return st;
+    }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
new file mode 100644
index 0000000..43347f6
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.asterix.event.schema.cluster.Cluster;
+
+public class NoReplicationStrategy implements IReplicationStrategy {
+
+    @Override
+    public boolean isMatch(int datasetId) {
+        return false;
+    }
+
+    @Override
+    public boolean isParticipant(String nodeId) {
+        return false;
+    }
+
+    @Override
+    public Set<Replica> getRemotePrimaryReplicas(String nodeId) {
+        return Collections.emptySet();
+    }
+
+    @Override
+    public Set<Replica> getRemoteReplicas(String node) {
+        return Collections.emptySet();
+    }
+
+    @Override
+    public NoReplicationStrategy from(Cluster cluster) {
+        return new NoReplicationStrategy();
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
index bd77778..267a22d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
@@ -36,11 +36,13 @@
         UNKNOWN
     }
 
-    final Node node;
+    private final Node node;
     private ReplicaState state = ReplicaState.UNKNOWN;
 
     public Replica(Node node) {
-        this.node = node;
+        this.node = new Node();
+        this.node.setId(node.getId());
+        this.node.setClusterIp(node.getClusterIp());
     }
 
     public ReplicaState getState() {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicationStrategyFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicationStrategyFactory.java
new file mode 100644
index 0000000..fbe798d
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicationStrategyFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.asterix.event.schema.cluster.Cluster;
+
+public class ReplicationStrategyFactory {
+
+    private static final Map<String, Class<? extends IReplicationStrategy>> BUILT_IN_REPLICATION_STRATEGY =
+            new HashMap<>();
+
+    static {
+        BUILT_IN_REPLICATION_STRATEGY.put("no_replication", NoReplicationStrategy.class);
+        BUILT_IN_REPLICATION_STRATEGY.put("chained_declustering", ChainedDeclusteringReplicationStrategy.class);
+        BUILT_IN_REPLICATION_STRATEGY.put("metadata_only", MetadataOnlyReplicationStrategy.class);
+    }
+
+    private ReplicationStrategyFactory() {
+        throw new AssertionError();
+    }
+
+    public static IReplicationStrategy create(Cluster cluster) {
+        boolean highAvailabilityEnabled = cluster.getHighAvailability() != null
+                && cluster.getHighAvailability().getEnabled() != null
+                && Boolean.valueOf(cluster.getHighAvailability().getEnabled());
+
+        if (!highAvailabilityEnabled || cluster.getHighAvailability().getDataReplication() == null
+                || cluster.getHighAvailability().getDataReplication().getStrategy() == null) {
+            return new NoReplicationStrategy();
+        }
+        String strategyName = cluster.getHighAvailability().getDataReplication().getStrategy().toLowerCase();
+        if (!BUILT_IN_REPLICATION_STRATEGY.containsKey(strategyName)) {
+            throw new IllegalArgumentException(String.format("Unsupported Replication Strategy. Available types: %s",
+                    BUILT_IN_REPLICATION_STRATEGY.keySet().toString()));
+        }
+        Class<? extends IReplicationStrategy> clazz = BUILT_IN_REPLICATION_STRATEGY.get(strategyName);
+        try {
+            return clazz.newInstance().from(cluster);
+        } catch (InstantiationException | IllegalAccessException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexFileProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexFileProperties.java
new file mode 100644
index 0000000..ca6968f
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexFileProperties.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.storage;
+
+import java.io.File;
+
+import org.apache.asterix.common.utils.StoragePathUtil;
+
+/**
+ * A holder class for an index file properties.
+ */
+public class IndexFileProperties {
+
+    private final String fileName;
+    private final String idxName;
+    private final String dataverseName;
+    private final int partitionId;
+    private final int datasetId;
+
+    public IndexFileProperties(int partitionId, String dataverseName, String idxName, String fileName, int datasetId) {
+        this.partitionId = partitionId;
+        this.dataverseName = dataverseName;
+        this.idxName = idxName;
+        this.fileName = fileName;
+        this.datasetId = datasetId;
+    }
+
+    public String getFileName() {
+        return fileName;
+    }
+
+    public String getIdxName() {
+        return idxName;
+    }
+
+    public String getDataverseName() {
+        return dataverseName;
+    }
+
+    public int getPartitionId() {
+        return partitionId;
+    }
+
+    public int getDatasetId() {
+        return datasetId;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append(StoragePathUtil.PARTITION_DIR_PREFIX + partitionId + File.separator);
+        sb.append(dataverseName + File.separator);
+        sb.append(idxName + File.separator);
+        sb.append(fileName);
+        sb.append(" [Dataset ID: " + datasetId + "]");
+        return sb.toString();
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
index 6816116..3e85276 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
@@ -34,6 +34,7 @@
 public interface IRecoveryManager {
 
     public enum SystemState {
+        INITIAL_RUN,
         NEW_UNIVERSE,
         RECOVERING,
         HEALTHY,
@@ -120,4 +121,6 @@
      * Deletes all temporary recovery files
      */
     public void deleteRecoveryTemporaryFiles();
+
+    void startLocalRecovery(Set<Integer> partitions) throws IOException, ACIDException;
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index 34af5c3..4f37c88 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -75,6 +75,17 @@
     }
 
     /**
+     * @param fileAbsolutePath
+     * @return the file relative path starting from the partition directory
+     */
+    public static String getIndexFileRelativePath(String fileAbsolutePath) {
+        String[] tokens = fileAbsolutePath.split(File.separator);
+        //partition/dataverse/idx/fileName
+        return tokens[tokens.length - 4] + File.separator + tokens[tokens.length - 3] + File.separator
+                + tokens[tokens.length - 2] + File.separator + tokens[tokens.length - 1];
+    }
+
+    /**
      * Create a file
      * Note: this method is not thread safe. It is the responsibility of the caller to ensure no path conflict when
      * creating files simultaneously
diff --git a/asterixdb/asterix-common/src/main/resources/schema/cluster.xsd b/asterixdb/asterix-common/src/main/resources/schema/cluster.xsd
index 79c377a..098b4e7 100644
--- a/asterixdb/asterix-common/src/main/resources/schema/cluster.xsd
+++ b/asterixdb/asterix-common/src/main/resources/schema/cluster.xsd
@@ -44,7 +44,7 @@
     <xs:element name="http_port" type="xs:integer" />
     <xs:element name="debug_port" type="xs:integer" />
     <xs:element name="metadata_node" type="xs:string" />
-    <xs:element name="enabled" type="xs:boolean" />
+    <xs:element name="enabled" type="xs:string" />
     <xs:element name="replication_port" type="xs:integer" />
     <xs:element name="replication_factor" type="xs:integer" />
     <xs:element name="auto_failover" type="xs:boolean" />
@@ -57,6 +57,8 @@
     <xs:element name="result_time_to_live" type="xs:long" />
     <xs:element name="result_sweep_threshold" type="xs:long" />
     <xs:element name="cc_root" type="xs:string" />
+    <xs:element name="strategy" type="xs:string" />
+    <xs:element name="node_id" type="xs:string" />
 
     <!-- definition of complex elements -->
     <xs:element name="working_dir">
@@ -87,11 +89,29 @@
     <xs:element name="data_replication">
         <xs:complexType>
             <xs:sequence>
-                <xs:element ref="cl:enabled" />
+                <xs:element ref="cl:strategy" />
                 <xs:element ref="cl:replication_port" />
                 <xs:element ref="cl:replication_factor" />
-                <xs:element ref="cl:auto_failover" />
                 <xs:element ref="cl:replication_time_out" />
+            </xs:sequence>
+        </xs:complexType>
+    </xs:element>
+
+    <xs:element name="fault_tolerance">
+        <xs:complexType>
+            <xs:sequence>
+                <xs:element ref="cl:strategy" />
+                <xs:element ref="cl:replica" minOccurs="0"/>
+            </xs:sequence>
+        </xs:complexType>
+    </xs:element>
+
+    <xs:element name="high_availability">
+        <xs:complexType>
+            <xs:sequence>
+                <xs:element ref="cl:enabled" minOccurs="0"/>
+                <xs:element ref="cl:data_replication" minOccurs="0"/>
+                <xs:element ref="cl:fault_tolerance" minOccurs="0"/>
             </xs:sequence>
         </xs:complexType>
     </xs:element>
@@ -136,6 +156,14 @@
         </xs:complexType>
     </xs:element>
 
+    <xs:element name="replica">
+        <xs:complexType>
+            <xs:sequence>
+                <xs:element ref="cl:node_id" maxOccurs="unbounded" />
+            </xs:sequence>
+        </xs:complexType>
+    </xs:element>
+
     <xs:element name="cluster">
         <xs:complexType>
             <xs:sequence>
@@ -150,7 +178,7 @@
                 <xs:element ref="cl:iodevices" minOccurs="0" />
                 <xs:element ref="cl:working_dir" />
                 <xs:element ref="cl:metadata_node" />
-                <xs:element ref="cl:data_replication" minOccurs="0" />
+                <xs:element ref="cl:high_availability" minOccurs="0" />
                 <xs:element ref="cl:master_node" />
                 <xs:element ref="cl:node" maxOccurs="unbounded" />
                 <xs:element ref="cl:substitute_nodes" />
diff --git a/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java b/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
index 4037eaf..de09713 100644
--- a/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
+++ b/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
@@ -262,9 +262,9 @@
         boolean valid = true;
 
         //if replication is disabled, no need to validate the settings
-        if (cluster.getDataReplication() != null && cluster.getDataReplication().isEnabled()) {
+        if (cluster.getHighAvailability() != null && cluster.getHighAvailability().getDataReplication() != null) {
 
-            if (cluster.getDataReplication().getReplicationFactor() == null) {
+            if (cluster.getHighAvailability().getDataReplication().getReplicationFactor() == null) {
                 if (cluster.getNode().size() >= 3) {
                     LOGGER.warn("Replication factor not defined. Using default value (3) " + WARNING);
 
@@ -276,26 +276,30 @@
             }
 
             //replication factor = 1 means no replication
-            if (cluster.getDataReplication().getReplicationFactor().intValue() == 1) {
+            if (cluster.getHighAvailability().getDataReplication().getReplicationFactor().intValue() == 1) {
                 LOGGER.warn("Replication factor is set to 1. Disabling data replication" + WARNING);
                 return true;
             }
 
-            if (cluster.getDataReplication().getReplicationFactor().intValue() > cluster.getNode().size()) {
-                LOGGER.fatal("Replication factor = " + cluster.getDataReplication().getReplicationFactor().intValue()
-                        + "  requires at least " + cluster.getDataReplication().getReplicationFactor().intValue()
+            if (cluster.getHighAvailability().getDataReplication().getReplicationFactor().intValue() > cluster.getNode()
+                    .size()) {
+                LOGGER.fatal("Replication factor = "
+                        + cluster.getHighAvailability().getDataReplication().getReplicationFactor().intValue()
+                        + "  requires at least "
+                        + cluster.getHighAvailability().getDataReplication().getReplicationFactor().intValue()
                         + " nodes in the cluster" + ERROR);
                 valid = false;
             }
 
-            if (cluster.getDataReplication().getReplicationPort() == null
-                    || cluster.getDataReplication().getReplicationPort().toString().length() == 0) {
+            if (cluster.getHighAvailability().getDataReplication().getReplicationPort() == null || cluster
+                    .getHighAvailability().getDataReplication().getReplicationPort().toString().length() == 0) {
                 valid = false;
                 LOGGER.fatal("Replication data port not defined for data repliaction. " + ERROR);
             }
 
-            if (cluster.getDataReplication().getReplicationTimeOut() == null
-                    || (cluster.getDataReplication().getReplicationTimeOut().intValue() + "").length() == 0) {
+            if (cluster.getHighAvailability().getDataReplication().getReplicationTimeOut() == null || String
+                    .valueOf(cluster.getHighAvailability().getDataReplication().getReplicationTimeOut().intValue())
+                    .length() == 0) {
                 LOGGER.warn("Replication maximum wait time not defined. Using default value (60 seconds) " + WARNING);
             }
 
diff --git a/asterixdb/asterix-installer/src/main/resources/clusters/local/local_with_replication.xml b/asterixdb/asterix-installer/src/main/resources/clusters/local/local_with_replication.xml
index e6a3547..954a311 100644
--- a/asterixdb/asterix-installer/src/main/resources/clusters/local/local_with_replication.xml
+++ b/asterixdb/asterix-installer/src/main/resources/clusters/local/local_with_replication.xml
@@ -43,13 +43,18 @@
 
   <metadata_node>nc1</metadata_node>
 
-  <data_replication>
+  <high_availability>
     <enabled>true</enabled>
-    <replication_port>2000</replication_port>
-    <replication_factor>2</replication_factor>
-    <auto_failover>true</auto_failover>
-    <replication_time_out>10</replication_time_out>
-  </data_replication>
+    <data_replication>
+      <strategy>chained_declustering</strategy>
+      <replication_port>2000</replication_port>
+      <replication_factor>2</replication_factor>
+      <replication_time_out>30</replication_time_out>
+    </data_replication>
+    <fault_tolerance>
+       <strategy>auto</strategy>
+    </fault_tolerance>
+  </high_availability>
 
   <master_node>
     <id>master</id>
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index 529a660..4fcc573 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -44,8 +44,8 @@
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.config.IPropertiesProvider;
+import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.context.IndexInfo;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
@@ -59,6 +59,7 @@
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.LogSource;
 import org.apache.asterix.common.transactions.LogType;
+import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.common.utils.TransactionUtil;
 import org.apache.asterix.replication.functions.ReplicaFilesRequest;
 import org.apache.asterix.replication.functions.ReplicaIndexFlushRequest;
@@ -69,7 +70,6 @@
 import org.apache.asterix.replication.storage.LSMComponentProperties;
 import org.apache.asterix.replication.storage.LSMIndexFileProperties;
 import org.apache.asterix.replication.storage.ReplicaResourcesManager;
-import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.application.INCApplicationContext;
 import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
@@ -122,7 +122,7 @@
         Map<String, ClusterPartition[]> nodePartitions =
                 ((IPropertiesProvider) asterixAppRuntimeContextProvider.getAppContext()).getMetadataProperties()
                         .getNodePartitions();
-        Set<String> nodeReplicationClients = replicationProperties.getNodeReplicationClients(nodeId);
+        Set<String> nodeReplicationClients = replicationProperties.getRemotePrimaryReplicasIds(nodeId);
         List<Integer> clientsPartitions = new ArrayList<>();
         for (String clientId : nodeReplicationClients) {
             for (ClusterPartition clusterPartition : nodePartitions.get(clientId)) {
@@ -385,7 +385,7 @@
                     filesList = replicaResourcesManager.getPartitionIndexesFiles(partition.getPartitionId(), false);
                     //start sending files
                     for (String filePath : filesList) {
-                        String relativeFilePath = PersistentLocalResourceRepository.getResourceRelativePath(filePath);
+                        String relativeFilePath = StoragePathUtil.getIndexFileRelativePath(filePath);
                         //if the file already exists on the requester, skip it
                         if (!requesterExistingFiles.contains(relativeFilePath)) {
                             try (RandomAccessFile fromFile = new RandomAccessFile(filePath, "r");
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index dc4a93a..b78e208 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -50,18 +50,20 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.logging.Level;
 import java.util.logging.Logger;
+import java.util.stream.Collectors;
 
 import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.config.ReplicationProperties;
-import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.config.IPropertiesProvider;
+import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.dataflow.LSMIndexUtil;
-import org.apache.asterix.common.replication.ReplicationJob;
 import org.apache.asterix.common.replication.IReplicaResourcesManager;
 import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.replication.Replica;
 import org.apache.asterix.common.replication.Replica.ReplicaState;
 import org.apache.asterix.common.replication.ReplicaEvent;
+import org.apache.asterix.common.replication.ReplicationJob;
+import org.apache.asterix.common.storage.IndexFileProperties;
 import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.ILogRecord;
@@ -103,7 +105,6 @@
     private final Map<Integer, Set<String>> jobCommitAcks;
     private final Map<Integer, ILogRecord> replicationJobsPendingAcks;
     private ByteBuffer dataBuffer;
-
     private final LinkedBlockingQueue<IReplicationJob> replicationJobsQ;
     private final LinkedBlockingQueue<ReplicaEvent> replicaEventsQ;
 
@@ -124,8 +125,8 @@
     private ReplicationJobsProccessor replicationJobsProcessor;
     private final ReplicasEventsMonitor replicationMonitor;
     //dummy job used to stop ReplicationJobsProccessor thread.
-    private static final IReplicationJob REPLICATION_JOB_POISON_PILL = new ReplicationJob(
-            ReplicationJobType.METADATA, ReplicationOperation.REPLICATE, ReplicationExecutionType.ASYNC, null);
+    private static final IReplicationJob REPLICATION_JOB_POISON_PILL = new ReplicationJob(ReplicationJobType.METADATA,
+            ReplicationOperation.REPLICATE, ReplicationExecutionType.ASYNC, null);
     //used to identify the correct IP address when the node has multiple network interfaces
     private String hostIPAddressFirstOctet = null;
 
@@ -136,6 +137,8 @@
     private Future<? extends Object> txnLogReplicatorTask;
     private SocketChannel[] logsRepSockets;
     private final ByteBuffer txnLogsBatchSizeBuffer = ByteBuffer.allocate(Integer.BYTES);
+    private final IReplicationStrategy replicationStrategy;
+    private final PersistentLocalResourceRepository localResourceRepo;
 
     //TODO this class needs to be refactored by moving its private classes to separate files
     //and possibly using MessageBroker to send/receive remote replicas events.
@@ -144,35 +147,38 @@
             IAppRuntimeContextProvider asterixAppRuntimeContextProvider) {
         this.nodeId = nodeId;
         this.replicationProperties = replicationProperties;
+        replicationStrategy = replicationProperties.getReplicationStrategy();
         this.replicaResourcesManager = (ReplicaResourcesManager) remoteResoucesManager;
         this.asterixAppRuntimeContextProvider = asterixAppRuntimeContextProvider;
-        this.hostIPAddressFirstOctet = replicationProperties.getReplicaIPAddress(nodeId).substring(0, 3);
         this.logManager = logManager;
+        localResourceRepo = (PersistentLocalResourceRepository) asterixAppRuntimeContextProvider
+                .getLocalResourceRepository();
+        this.hostIPAddressFirstOctet = replicationProperties.getReplicaIPAddress(nodeId).substring(0, 3);
+        replicas = new HashMap<>();
         replicationJobsQ = new LinkedBlockingQueue<>();
         replicaEventsQ = new LinkedBlockingQueue<>();
         terminateJobsReplication = new AtomicBoolean(false);
         jobsReplicationSuspended = new AtomicBoolean(true);
         replicationSuspended = new AtomicBoolean(true);
-        replicas = new HashMap<>();
         jobCommitAcks = new ConcurrentHashMap<>();
         replicationJobsPendingAcks = new ConcurrentHashMap<>();
         shuttingDownReplicaIds = new HashSet<>();
         dataBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
+        replicationMonitor = new ReplicasEventsMonitor();
+        //add list of replicas from configurations (To be read from another source e.g. Zookeeper)
+        Set<Replica> replicaNodes = replicationProperties.getReplicationStrategy().getRemoteReplicas(nodeId);
 
         //Used as async listeners from replicas
         replicationListenerThreads = Executors.newCachedThreadPool();
         replicationJobsProcessor = new ReplicationJobsProccessor();
-        replicationMonitor = new ReplicasEventsMonitor();
 
         Map<String, ClusterPartition[]> nodePartitions = ((IPropertiesProvider) asterixAppRuntimeContextProvider
                 .getAppContext()).getMetadataProperties().getNodePartitions();
-        //add list of replicas from configurations (To be read from another source e.g. Zookeeper)
-        Set<Replica> replicaNodes = replicationProperties.getRemoteReplicas(nodeId);
         replica2PartitionsMap = new HashMap<>(replicaNodes.size());
         for (Replica replica : replicaNodes) {
-            replicas.put(replica.getNode().getId(), replica);
+            replicas.put(replica.getId(), replica);
             //for each remote replica, get the list of replication clients
-            Set<String> nodeReplicationClients = replicationProperties.getNodeReplicationClients(replica.getId());
+            Set<String> nodeReplicationClients = replicationProperties.getRemotePrimaryReplicasIds(replica.getId());
             //get the partitions of each client
             List<Integer> clientPartitions = new ArrayList<>();
             for (String clientId : nodeReplicationClients) {
@@ -256,7 +262,6 @@
                 getAndInitNewPage();
             }
         }
-
         currentTxnLogBuffer.append(logRecord);
     }
 
@@ -278,7 +283,12 @@
             //all of the job's files belong to a single storage partition.
             //get any of them to determine the partition from the file path.
             String jobFile = job.getJobFiles().iterator().next();
-            int jobPartitionId = PersistentLocalResourceRepository.getResourcePartition(jobFile);
+            IndexFileProperties indexFileRef = localResourceRepo.getIndexFileRef(jobFile);
+            if (!replicationStrategy.isMatch(indexFileRef.getDatasetId())) {
+                return;
+            }
+
+            int jobPartitionId = indexFileRef.getPartitionId();
 
             ByteBuffer responseBuffer = null;
             LSMIndexFileProperties asterixFileProperties = new LSMIndexFileProperties();
@@ -443,7 +453,7 @@
 
     @Override
     public boolean isReplicationEnabled() {
-        return ClusterProperties.INSTANCE.isReplicationEnabled();
+        return replicationProperties.isParticipant(nodeId);
     }
 
     @Override
@@ -822,7 +832,6 @@
                 }
             }
         }
-
         //presume replicated
         return true;
     }
@@ -908,18 +917,37 @@
     public void stop(boolean dumpState, OutputStream ouputStream) throws IOException {
         //stop replication thread afters all jobs/logs have been processed
         suspendReplication(false);
-        //send shutdown event to remote replicas
-        sendShutdownNotifiction();
-        //wait until all shutdown events come from all remote replicas
-        synchronized (shuttingDownReplicaIds) {
-            while (!shuttingDownReplicaIds.containsAll(getActiveReplicasIds())) {
-                try {
-                    shuttingDownReplicaIds.wait(1000);
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
+
+        /**
+         * If this node has any remote replicas, it needs to inform them
+         * that it is shutting down.
+         */
+        if (!replicationStrategy.getRemoteReplicas(nodeId).isEmpty()) {
+            //send shutdown event to remote replicas
+            sendShutdownNotifiction();
+        }
+
+        /**
+         * If this node has any remote primary replicas, then it needs to wait
+         * until all of them send the shutdown notification.
+         */
+        // find active remote primary replicas
+        Set<String> activeRemotePrimaryReplicas = replicationStrategy.getRemotePrimaryReplicas(nodeId).stream()
+                .map(Replica::getId).filter(getActiveReplicasIds()::contains).collect(Collectors.toSet());
+
+        if (!activeRemotePrimaryReplicas.isEmpty()) {
+            //wait until all shutdown events come from all remote primary replicas
+            synchronized (shuttingDownReplicaIds) {
+                while (!shuttingDownReplicaIds.containsAll(activeRemotePrimaryReplicas)) {
+                    try {
+                        shuttingDownReplicaIds.wait();
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                    }
                 }
             }
         }
+
         LOGGER.log(Level.INFO, "Got shutdown notification from all remote replicas");
         //close replication channel
         asterixAppRuntimeContextProvider.getAppContext().getReplicationChannel().close();
@@ -996,6 +1024,9 @@
     public void requestFlushLaggingReplicaIndexes(long nonSharpCheckpointTargetLSN) throws IOException {
         long startLSN = logManager.getAppendLSN();
         Set<String> replicaIds = getActiveReplicasIds();
+        if (replicaIds.isEmpty()) {
+            return;
+        }
         ByteBuffer requestBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
         for (String replicaId : replicaIds) {
             //1. identify replica indexes with LSN less than nonSharpCheckpointTargetLSN.
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
index 0dcdc7b..b95690b 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
@@ -31,9 +31,9 @@
 import org.apache.asterix.common.api.IAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.config.IPropertiesProvider;
+import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.replication.IRemoteRecoveryManager;
 import org.apache.asterix.common.replication.IReplicationManager;
@@ -41,6 +41,7 @@
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.replication.storage.ReplicaResourcesManager;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class RemoteRecoveryManager implements IRemoteRecoveryManager {
 
@@ -61,8 +62,7 @@
         //1. identify which replicas reside in this node
         String localNodeId = runtimeContext.getTransactionSubsystem().getId();
 
-        Set<String> nodes = replicationProperties.getNodeReplicationClients(localNodeId);
-
+        Set<String> nodes = replicationProperties.getNodeReplicasIds(localNodeId);
         Map<String, Set<String>> recoveryCandidates = new HashMap<>();
         Map<String, Integer> candidatesScore = new HashMap<>();
 
@@ -124,16 +124,9 @@
     }
 
     @Override
-    public void takeoverPartitons(Integer[] partitions) throws IOException, ACIDException {
-        /**
-         * TODO even though the takeover is always expected to succeed,
-         * in case of any failure during the takeover, the CC should be
-         * notified that the takeover failed.
-         */
-        Set<Integer> partitionsToTakeover = new HashSet<>(Arrays.asList(partitions));
+    public void replayReplicaPartitionLogs(Set<Integer> partitions, boolean flush) throws HyracksDataException {
         ILogManager logManager = runtimeContext.getTransactionSubsystem().getLogManager();
-
-        long minLSN = runtimeContext.getReplicaResourcesManager().getPartitionsMinLSN(partitionsToTakeover);
+        long minLSN = runtimeContext.getReplicaResourcesManager().getPartitionsMinLSN(partitions);
         long readableSmallestLSN = logManager.getReadableSmallestLSN();
         if (minLSN < readableSmallestLSN) {
             minLSN = readableSmallestLSN;
@@ -141,7 +134,25 @@
 
         //replay logs > minLSN that belong to these partitions
         IRecoveryManager recoveryManager = runtimeContext.getTransactionSubsystem().getRecoveryManager();
-        recoveryManager.replayPartitionsLogs(partitionsToTakeover, logManager.getLogReader(true), minLSN);
+        try {
+            recoveryManager.replayPartitionsLogs(partitions, logManager.getLogReader(true), minLSN);
+            if (flush) {
+                runtimeContext.getDatasetLifecycleManager().flushAllDatasets();
+            }
+        } catch (IOException | ACIDException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void takeoverPartitons(Integer[] partitions) throws IOException, ACIDException {
+        /**
+         * TODO even though the takeover is always expected to succeed,
+         * in case of any failure during the takeover, the CC should be
+         * notified that the takeover failed.
+         */
+        Set<Integer> partitionsToTakeover = new HashSet<>(Arrays.asList(partitions));
+        replayReplicaPartitionLogs(partitionsToTakeover, false);
 
         //mark these partitions as active in this node
         PersistentLocalResourceRepository resourceRepository = (PersistentLocalResourceRepository) runtimeContext
@@ -158,7 +169,6 @@
                 .getLocalResourceRepository();
         IDatasetLifecycleManager datasetLifeCycleManager = runtimeContext.getDatasetLifecycleManager();
 
-        failbackRecoveryReplicas = new HashMap<>();
         while (true) {
             //start recovery steps
             try {
@@ -209,8 +219,8 @@
         ILogManager logManager = runtimeContext.getTransactionSubsystem().getLogManager();
         ReplicaResourcesManager replicaResourcesManager = (ReplicaResourcesManager) runtimeContext
                 .getReplicaResourcesManager();
-        Map<String, ClusterPartition[]> nodePartitions = ((IPropertiesProvider) runtimeContext)
-                .getMetadataProperties().getNodePartitions();
+        Map<String, ClusterPartition[]> nodePartitions = ((IPropertiesProvider) runtimeContext).getMetadataProperties()
+                .getNodePartitions();
 
         /**
          * for each lost partition, get the remaining files from replicas
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
index cce3dc4..6a2ebf6 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
@@ -38,8 +38,8 @@
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.config.MetadataProperties;
 import org.apache.asterix.common.config.ClusterProperties;
+import org.apache.asterix.common.config.MetadataProperties;
 import org.apache.asterix.common.replication.IReplicaResourcesManager;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
@@ -285,7 +285,7 @@
                             partitionFiles.add(file.getAbsolutePath());
                         } else {
                             partitionFiles.add(
-                                    PersistentLocalResourceRepository.getResourceRelativePath(file.getAbsolutePath()));
+                                    StoragePathUtil.getIndexFileRelativePath(file.getAbsolutePath()));
                         }
                     }
                 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
index cb56c39..806e51f 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
@@ -20,7 +20,7 @@
 
 import java.util.Set;
 
-import org.apache.asterix.runtime.util.ClusterStateManager;
+import org.apache.asterix.runtime.util.FaultToleranceManager;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
 
@@ -49,6 +49,6 @@
 
     @Override
     public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
-        ClusterStateManager.INSTANCE.processCompleteFailbackResponse(this);
+        FaultToleranceManager.INSTANCE.processCompleteFailbackResponse(this);
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/NCLifecycleTaskReportMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/NCLifecycleTaskReportMessage.java
new file mode 100644
index 0000000..a5f3a8a
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/NCLifecycleTaskReportMessage.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
+import org.apache.asterix.runtime.util.FaultToleranceManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+
+public class NCLifecycleTaskReportMessage implements IApplicationMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final int requestId;
+    private final String nodeId;
+    private final boolean success;
+    private Exception exception;
+
+    public NCLifecycleTaskReportMessage(int requestId, String nodeId, boolean success) {
+        this.requestId = requestId;
+        this.nodeId = nodeId;
+        this.success = success;
+    }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
+        FaultToleranceManager.INSTANCE.processNCLifecycleTaskReport(this);
+    }
+
+    public int getRequestId() {
+        return requestId;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public boolean isSuccess() {
+        return success;
+    }
+
+    public Exception getException() {
+        return exception;
+    }
+
+    public void setException(Exception exception) {
+        this.exception = exception;
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
index d87cd23..bca61d0 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
@@ -20,7 +20,7 @@
 
 import java.util.Set;
 
-import org.apache.asterix.runtime.util.ClusterStateManager;
+import org.apache.asterix.runtime.util.FaultToleranceManager;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
 
@@ -40,7 +40,7 @@
 
     @Override
     public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
-        ClusterStateManager.INSTANCE.processPreparePartitionsFailbackResponse(this);
+        FaultToleranceManager.INSTANCE.processPreparePartitionsFailbackResponse(this);
     }
 
     @Override
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplayPartitionLogsRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplayPartitionLogsRequestMessage.java
new file mode 100644
index 0000000..79ab174
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplayPartitionLogsRequestMessage.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAppRuntimeContext;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class ReplayPartitionLogsRequestMessage implements IApplicationMessage {
+
+    private static final Logger LOGGER = Logger.getLogger(ReplayPartitionLogsRequestMessage.class.getName());
+    private static final long serialVersionUID = 1L;
+    private final int requestId;
+    private final Set<Integer> partitions;
+
+    public ReplayPartitionLogsRequestMessage(int requestId, Set<Integer> partitions) {
+        this.requestId = requestId;
+        this.partitions = partitions;
+    }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
+        NodeControllerService ncs = (NodeControllerService) cs;
+        IAppRuntimeContext appContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+        // Replay the logs for these partitions and flush any impacted dataset
+        appContext.getRemoteRecoveryManager().replayReplicaPartitionLogs(partitions, true);
+
+        INCMessageBroker broker = (INCMessageBroker) ncs.getApplicationContext().getMessageBroker();
+        ReplayPartitionLogsResponseMessage reponse = new ReplayPartitionLogsResponseMessage(requestId, ncs.getId(),
+                partitions);
+        try {
+            broker.sendMessageToCC(reponse);
+        } catch (Exception e) {
+            LOGGER.log(Level.SEVERE, "Failed sending message to cc", e);
+            throw ExceptionUtils.convertToHyracksDataException(e);
+        }
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplayPartitionLogsResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplayPartitionLogsResponseMessage.java
new file mode 100644
index 0000000..5c3bc65
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplayPartitionLogsResponseMessage.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import java.util.Set;
+
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
+import org.apache.asterix.runtime.util.FaultToleranceManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+
+public class ReplayPartitionLogsResponseMessage implements IApplicationMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final int requestId;
+    private final Set<Integer> partitions;
+    private final String nodeId;
+
+    public ReplayPartitionLogsResponseMessage(int requestId, String nodeId, Set<Integer> partitions) {
+        this.requestId = requestId;
+        this.partitions = partitions;
+        this.nodeId = nodeId;
+    }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
+        FaultToleranceManager.INSTANCE.processReplayPartitionLogsResponse(this);
+    }
+
+    public int getRequestId() {
+        return requestId;
+    }
+
+    public Set<Integer> getPartitions() {
+        return partitions;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/StartupTaskRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/StartupTaskRequestMessage.java
new file mode 100644
index 0000000..db32101
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/StartupTaskRequestMessage.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAppRuntimeContext;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
+import org.apache.asterix.runtime.util.FaultToleranceManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class StartupTaskRequestMessage implements IApplicationMessage {
+
+    private static final Logger LOGGER = Logger.getLogger(StartupTaskRequestMessage.class.getName());
+    private static final long serialVersionUID = 1L;
+    private final SystemState state;
+    private final String nodeId;
+
+    public StartupTaskRequestMessage(String nodeId, SystemState state) {
+        this.state = state;
+        this.nodeId = nodeId;
+    }
+
+    public static void send(NodeControllerService cs) throws HyracksDataException {
+        IAppRuntimeContext appContext = (IAppRuntimeContext) cs.getApplicationContext().getApplicationObject();
+        try {
+            SystemState systemState = appContext.getTransactionSubsystem().getRecoveryManager().getSystemState();
+            StartupTaskRequestMessage msg = new StartupTaskRequestMessage(cs.getId(), systemState);
+            ((INCMessageBroker) cs.getApplicationContext().getMessageBroker()).sendMessageToCC(msg);
+        } catch (Exception e) {
+            LOGGER.log(Level.SEVERE, "Unable to send StartupTaskRequestMessage to CC", e);
+            throw ExceptionUtils.convertToHyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
+        FaultToleranceManager.INSTANCE.processStartupTaskRequest(this);
+    }
+
+    public SystemState getState() {
+        return state;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/StartupTaskResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/StartupTaskResponseMessage.java
new file mode 100644
index 0000000..768f85e
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/StartupTaskResponseMessage.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class StartupTaskResponseMessage implements IApplicationMessage {
+
+    private static final Logger LOGGER = Logger.getLogger(StartupTaskResponseMessage.class.getName());
+    private static final long serialVersionUID = 1L;
+    private final int requestId;
+    private final String nodeId;
+    private final List<INCLifecycleTask> tasks;
+
+    public StartupTaskResponseMessage(int requestId, String nodeId, List<INCLifecycleTask> tasks) {
+        this.requestId = requestId;
+        this.nodeId = nodeId;
+        this.tasks = tasks;
+    }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
+        NodeControllerService ncs = (NodeControllerService) cs;
+        INCMessageBroker broker = (INCMessageBroker) ncs.getApplicationContext().getMessageBroker();
+        boolean success = true;
+        HyracksDataException exception = null;
+        try {
+            for (INCLifecycleTask task : tasks) {
+                task.start(cs);
+            }
+        } catch (HyracksDataException e) {
+            success = false;
+            exception = e;
+        }
+        NCLifecycleTaskReportMessage result = new NCLifecycleTaskReportMessage(requestId, nodeId, success);
+        result.setException(exception);
+        try {
+            broker.sendMessageToCC(result);
+        } catch (Exception e) {
+            LOGGER.log(Level.SEVERE, "Failed sending message to cc", e);
+            throw ExceptionUtils.convertToHyracksDataException(e);
+        }
+    }
+
+    public int getRequestId() {
+        return requestId;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
index d3c3502..bd887c4 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
@@ -19,7 +19,7 @@
 package org.apache.asterix.runtime.message;
 
 import org.apache.asterix.common.messaging.api.IApplicationMessage;
-import org.apache.asterix.runtime.util.ClusterStateManager;
+import org.apache.asterix.runtime.util.FaultToleranceManager;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
 
@@ -38,7 +38,7 @@
 
     @Override
     public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
-        ClusterStateManager.INSTANCE.processMetadataNodeTakeoverResponse(this);
+        FaultToleranceManager.INSTANCE.processMetadataNodeTakeoverResponse(this);
     }
 
     @Override
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
index 3adc8e9..117ca8d 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
@@ -19,7 +19,7 @@
 package org.apache.asterix.runtime.message;
 
 import org.apache.asterix.common.messaging.api.IApplicationMessage;
-import org.apache.asterix.runtime.util.ClusterStateManager;
+import org.apache.asterix.runtime.util.FaultToleranceManager;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
 
@@ -50,7 +50,7 @@
 
     @Override
     public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
-        ClusterStateManager.INSTANCE.processPartitionTakeoverResponse(this);
+        FaultToleranceManager.INSTANCE.processPartitionTakeoverResponse(this);
     }
 
     @Override
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/replication/FaultToleranceStrategyFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/replication/FaultToleranceStrategyFactory.java
new file mode 100644
index 0000000..0f9226e
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/replication/FaultToleranceStrategyFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.replication;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.event.schema.cluster.Cluster;
+
+public class FaultToleranceStrategyFactory {
+
+    //    private static final Map<String, Class<? extends IFaultToleranceStrategy>> BUILT_IN_FAULT_TOLERANCE_STRATEGY = new HashMap<>();
+    //    static {
+    //        BUILT_IN_FAULT_TOLERANCE_STRATEGY.put("no_fault_tolerance", NoFaultToleranceStrategy.class);
+    //        BUILT_IN_FAULT_TOLERANCE_STRATEGY.put("metadata_node", MetadataNodeFaultToleranceStrategy.class);
+    //        BUILT_IN_FAULT_TOLERANCE_STRATEGY.put("auto", AutoFaultToleranceStrategy.class);
+    //    }
+    private static final Map<String, String> BUILT_IN_FAULT_TOLERANCE_STRATEGY = new HashMap<>();
+    static {
+        BUILT_IN_FAULT_TOLERANCE_STRATEGY.put("no_fault_tolerance",
+                "org.apache.asterix.app.replication.NoFaultToleranceStrategy");
+        BUILT_IN_FAULT_TOLERANCE_STRATEGY.put("metadata_node",
+                "org.apache.asterix.app.replication.MetadataNodeFaultToleranceStrategy");
+        BUILT_IN_FAULT_TOLERANCE_STRATEGY.put("auto", "org.apache.asterix.app.replication.AutoFaultToleranceStrategy");
+    }
+
+    private FaultToleranceStrategyFactory() {
+        throw new AssertionError();
+    }
+
+    public static IFaultToleranceStrategy create(Cluster cluster, IReplicationStrategy repStrategy,
+            IClusterStateManager clusterManager, ICCMessageBroker messageBroker) {
+        boolean highAvailabilityEnabled = cluster.getHighAvailability() != null
+                && cluster.getHighAvailability().getEnabled() != null
+                && Boolean.valueOf(cluster.getHighAvailability().getEnabled());
+        try {
+            if (!highAvailabilityEnabled || cluster.getHighAvailability().getFaultTolerance() == null
+                    || cluster.getHighAvailability().getFaultTolerance().getStrategy() == null) {
+                return ((IFaultToleranceStrategy) Class
+                        .forName(BUILT_IN_FAULT_TOLERANCE_STRATEGY.get("no_fault_tolerance")).newInstance())
+                                .from(repStrategy, clusterManager, messageBroker);
+            }
+            String strategyName = cluster.getHighAvailability().getFaultTolerance().getStrategy().toLowerCase();
+            if (!BUILT_IN_FAULT_TOLERANCE_STRATEGY.containsKey(strategyName)) {
+                throw new IllegalArgumentException(
+                        String.format("Unsupported Replication Strategy. Available types: %s",
+                                BUILT_IN_FAULT_TOLERANCE_STRATEGY.keySet().toString()));
+            }
+
+            Class<? extends IFaultToleranceStrategy> clazz = (Class<? extends IFaultToleranceStrategy>) Class
+                    .forName(BUILT_IN_FAULT_TOLERANCE_STRATEGY.get(strategyName));
+            return clazz.newInstance().from(repStrategy, clusterManager, messageBroker);
+        } catch (InstantiationException | ClassNotFoundException | IllegalAccessException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/replication/IFaultToleranceStrategy.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/replication/IFaultToleranceStrategy.java
new file mode 100644
index 0000000..9517d7e
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/replication/IFaultToleranceStrategy.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.replication;
+
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.runtime.message.CompleteFailbackResponseMessage;
+import org.apache.asterix.runtime.message.NCLifecycleTaskReportMessage;
+import org.apache.asterix.runtime.message.PreparePartitionsFailbackResponseMessage;
+import org.apache.asterix.runtime.message.ReplayPartitionLogsResponseMessage;
+import org.apache.asterix.runtime.message.StartupTaskRequestMessage;
+import org.apache.asterix.runtime.message.TakeoverMetadataNodeResponseMessage;
+import org.apache.asterix.runtime.message.TakeoverPartitionsResponseMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IFaultToleranceStrategy {
+
+    /**
+     * Defines the logic of a {@link IFaultToleranceStrategy} when a node joins the cluster.
+     *
+     * @param nodeId
+     * @throws HyracksDataException
+     */
+    void notifyNodeJoin(String nodeId) throws HyracksDataException;
+
+    /**
+     * Defines the logic of a {@link IFaultToleranceStrategy} when a node leaves the cluster.
+     *
+     * @param nodeId
+     * @throws HyracksDataException
+     */
+    void notifyNodeFailure(String nodeId) throws HyracksDataException;
+
+    /**
+     * Constructs a fault tolerance strategy.
+     *
+     * @param replicationStrategy
+     * @param clusterManager
+     * @param messageBroker
+     * @return
+     */
+    IFaultToleranceStrategy from(IReplicationStrategy replicationStrategy, IClusterStateManager clusterManager,
+            ICCMessageBroker messageBroker);
+
+    default void process(CompleteFailbackResponseMessage msg) throws HyracksDataException {
+        throw new UnsupportedOperationException();
+    }
+
+    default void process(PreparePartitionsFailbackResponseMessage msg) throws HyracksDataException {
+        throw new UnsupportedOperationException();
+    }
+
+    default void process(TakeoverMetadataNodeResponseMessage msg) throws HyracksDataException {
+        throw new UnsupportedOperationException();
+    }
+
+    default void process(TakeoverPartitionsResponseMessage msg) throws HyracksDataException {
+        throw new UnsupportedOperationException();
+    }
+
+    default void process(ReplayPartitionLogsResponseMessage msg) throws HyracksDataException {
+        throw new UnsupportedOperationException();
+    }
+
+    default void process(StartupTaskRequestMessage msg) throws HyracksDataException {
+        throw new UnsupportedOperationException();
+    }
+
+    default void process(NCLifecycleTaskReportMessage msg) throws HyracksDataException {
+        throw new UnsupportedOperationException();
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java
index 3ba1965..6b35a2a 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java
@@ -19,13 +19,11 @@
 package org.apache.asterix.runtime.util;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.logging.Level;
@@ -33,24 +31,11 @@
 
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.config.ReplicationProperties;
+import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.config.ClusterProperties;
-import org.apache.asterix.common.messaging.api.ICCMessageBroker;
 import org.apache.asterix.event.schema.cluster.Cluster;
 import org.apache.asterix.event.schema.cluster.Node;
-import org.apache.asterix.runtime.message.CompleteFailbackRequestMessage;
-import org.apache.asterix.runtime.message.CompleteFailbackResponseMessage;
-import org.apache.asterix.runtime.message.NodeFailbackPlan;
-import org.apache.asterix.runtime.message.NodeFailbackPlan.FailbackPlanState;
-import org.apache.asterix.runtime.message.PreparePartitionsFailbackRequestMessage;
-import org.apache.asterix.runtime.message.PreparePartitionsFailbackResponseMessage;
-import org.apache.asterix.runtime.message.ReplicaEventMessage;
-import org.apache.asterix.runtime.message.TakeoverMetadataNodeRequestMessage;
-import org.apache.asterix.runtime.message.TakeoverMetadataNodeResponseMessage;
-import org.apache.asterix.runtime.message.TakeoverPartitionsRequestMessage;
-import org.apache.asterix.runtime.message.TakeoverPartitionsResponseMessage;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.json.JSONException;
@@ -60,7 +45,7 @@
  * A holder class for properties related to the Asterix cluster.
  */
 
-public class ClusterStateManager {
+public class ClusterStateManager implements IClusterStateManager {
     /*
      * TODO: currently after instance restarts we require all nodes to join again,
      * otherwise the cluster wont be ACTIVE. we may overcome this by storing the cluster state before the instance
@@ -69,9 +54,8 @@
 
     private static final Logger LOGGER = Logger.getLogger(ClusterStateManager.class.getName());
     public static final ClusterStateManager INSTANCE = new ClusterStateManager();
-    private static final String CLUSTER_NET_IP_ADDRESS_KEY = "cluster-net-ip-address";
     private static final String IO_DEVICES = "iodevices";
-    private Map<String, Map<String, String>> activeNcConfiguration = new HashMap<>();
+    private final Map<String, Map<String, String>> activeNcConfiguration = new HashMap<>();
 
     private final Cluster cluster;
     private ClusterState state = ClusterState.UNUSABLE;
@@ -82,32 +66,18 @@
 
     private Map<String, ClusterPartition[]> node2PartitionsMap = null;
     private SortedMap<Integer, ClusterPartition> clusterPartitions = null;
-    private Map<Long, TakeoverPartitionsRequestMessage> pendingTakeoverRequests = null;
 
-    private long clusterRequestId = 0;
     private String currentMetadataNode = null;
     private boolean metadataNodeActive = false;
-    private boolean autoFailover = false;
-    private boolean replicationEnabled = false;
     private Set<String> failedNodes = new HashSet<>();
-    private LinkedList<NodeFailbackPlan> pendingProcessingFailbackPlans;
-    private Map<Long, NodeFailbackPlan> planId2FailbackPlanMap;
 
     private ClusterStateManager() {
         cluster = ClusterProperties.INSTANCE.getCluster();
         // if this is the CC process
-        if (AppContextInfo.INSTANCE.initialized()
-                && AppContextInfo.INSTANCE.getCCApplicationContext() != null) {
+        if (AppContextInfo.INSTANCE.initialized() && AppContextInfo.INSTANCE.getCCApplicationContext() != null) {
             node2PartitionsMap = AppContextInfo.INSTANCE.getMetadataProperties().getNodePartitions();
             clusterPartitions = AppContextInfo.INSTANCE.getMetadataProperties().getClusterPartitions();
             currentMetadataNode = AppContextInfo.INSTANCE.getMetadataProperties().getMetadataNodeName();
-            replicationEnabled = ClusterProperties.INSTANCE.isReplicationEnabled();
-            autoFailover = ClusterProperties.INSTANCE.isAutoFailoverEnabled();
-            if (autoFailover) {
-                pendingTakeoverRequests = new HashMap<>();
-                pendingProcessingFailbackPlans = new LinkedList<>();
-                planId2FailbackPlanMap = new HashMap<>();
-            }
         }
     }
 
@@ -115,30 +85,8 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Removing configuration parameters for node id " + nodeId);
         }
-        activeNcConfiguration.remove(nodeId);
-
-        //if this node was waiting for failback and failed before it completed
-        if (failedNodes.contains(nodeId)) {
-            if (autoFailover) {
-                notifyFailbackPlansNodeFailure(nodeId);
-                revertFailedFailbackPlanEffects();
-            }
-        } else {
-            //an active node failed
-            failedNodes.add(nodeId);
-            if (nodeId.equals(currentMetadataNode)) {
-                metadataNodeActive = false;
-                LOGGER.info("Metadata node is now inactive");
-            }
-            updateNodePartitions(nodeId, false);
-            if (replicationEnabled) {
-                notifyImpactedReplicas(nodeId, ClusterEventType.NODE_FAILURE);
-                if (autoFailover) {
-                    notifyFailbackPlansNodeFailure(nodeId);
-                    requestPartitionsTakeover(nodeId);
-                }
-            }
-        }
+        failedNodes.add(nodeId);
+        FaultToleranceManager.INSTANCE.notifyNodeFailure(nodeId);
     }
 
     public synchronized void addNCConfiguration(String nodeId, Map<String, String> configuration)
@@ -147,46 +95,51 @@
             LOGGER.info("Registering configuration parameters for node id " + nodeId);
         }
         activeNcConfiguration.put(nodeId, configuration);
-
-        //a node trying to come back after failure
-        if (failedNodes.contains(nodeId)) {
-            if (autoFailover) {
-                prepareFailbackPlan(nodeId);
-                return;
-            } else {
-                //a node completed local or remote recovery and rejoined
-                failedNodes.remove(nodeId);
-                if (replicationEnabled) {
-                    //notify other replica to reconnect to this node
-                    notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN);
-                }
-            }
-        }
-
-        if (nodeId.equals(currentMetadataNode)) {
-            metadataNodeActive = true;
-            LOGGER.info("Metadata node is now active");
-        }
-        updateNodePartitions(nodeId, true);
+        failedNodes.remove(nodeId);
+        FaultToleranceManager.INSTANCE.notifyNodeJoin(nodeId);
     }
 
-    private synchronized void updateNodePartitions(String nodeId, boolean added) throws HyracksDataException {
+    @Override
+    public synchronized void forceIntoState(ClusterState state) {
+        this.state = state;
+        LOGGER.info("Cluster State is now " + state.name());
+    }
+
+    @Override
+    public void updateMetadataNode(String nodeId, boolean active) {
+        currentMetadataNode = nodeId;
+        metadataNodeActive = active;
+        if (active) {
+            LOGGER.info(String.format("Metadata node %s is now active", currentMetadataNode));
+        }
+    }
+
+    @Override
+    public synchronized void updateNodePartitions(String nodeId, boolean active) throws HyracksDataException {
         ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
         // if this isn't a storage node, it will not have cluster partitions
         if (nodePartitions != null) {
             for (ClusterPartition p : nodePartitions) {
-                // set the active node for this node's partitions
-                p.setActive(added);
-                if (added) {
-                    p.setActiveNodeId(nodeId);
-                }
+                updateClusterPartition(p.getPartitionId(), nodeId, active);
             }
-            resetClusterPartitionConstraint();
-            updateClusterState();
         }
     }
 
-    private synchronized void updateClusterState() throws HyracksDataException {
+    @Override
+    public synchronized void updateClusterPartition(Integer partitionNum, String activeNode, boolean active) {
+        ClusterPartition clusterPartition = clusterPartitions.get(partitionNum);
+        if (clusterPartition != null) {
+            // set the active node for this node's partitions
+            clusterPartition.setActive(active);
+            if (active) {
+                clusterPartition.setActiveNodeId(activeNode);
+            }
+        }
+    }
+
+    @Override
+    public synchronized void updateClusterState() throws HyracksDataException {
+        resetClusterPartitionConstraint();
         for (ClusterPartition p : clusterPartitions.values()) {
             if (!p.isActive()) {
                 state = ClusterState.UNUSABLE;
@@ -194,20 +147,17 @@
                 return;
             }
         }
+
+        state = ClusterState.PENDING;
+        LOGGER.info("Cluster is now " + state);
+
         // if all storage partitions are active as well as the metadata node, then the cluster is active
         if (metadataNodeActive) {
-            state = ClusterState.PENDING;
-            LOGGER.info("Cluster is now " + state);
             AppContextInfo.INSTANCE.getMetadataBootstrap().init();
             state = ClusterState.ACTIVE;
             LOGGER.info("Cluster is now " + state);
             // start global recovery
             AppContextInfo.INSTANCE.getGlobalRecoveryManager().startGlobalRecovery();
-            if (autoFailover && !pendingProcessingFailbackPlans.isEmpty()) {
-                processPendingFailbackPlans();
-            }
-        } else {
-            requestMetadataNodeTakeover();
         }
     }
 
@@ -230,6 +180,7 @@
         return ncConfig.get(IO_DEVICES).split(",");
     }
 
+    @Override
     public ClusterState getState() {
         return state;
     }
@@ -285,6 +236,7 @@
         return AppContextInfo.INSTANCE.getMetadataProperties().getNodeNames().size();
     }
 
+    @Override
     public synchronized ClusterPartition[] getNodePartitions(String nodeId) {
         return node2PartitionsMap.get(nodeId);
     }
@@ -296,337 +248,13 @@
         return 0;
     }
 
+    @Override
     public synchronized ClusterPartition[] getClusterPartitons() {
         ArrayList<ClusterPartition> partitons = new ArrayList<>();
         for (ClusterPartition partition : clusterPartitions.values()) {
             partitons.add(partition);
         }
         return partitons.toArray(new ClusterPartition[] {});
-    }
-
-    private synchronized void requestPartitionsTakeover(String failedNodeId) {
-        //replica -> list of partitions to takeover
-        Map<String, List<Integer>> partitionRecoveryPlan = new HashMap<>();
-        ReplicationProperties replicationProperties = AppContextInfo.INSTANCE.getReplicationProperties();
-
-        //collect the partitions of the failed NC
-        List<ClusterPartition> lostPartitions = getNodeAssignedPartitions(failedNodeId);
-        if (!lostPartitions.isEmpty()) {
-            for (ClusterPartition partition : lostPartitions) {
-                //find replicas for this partitions
-                Set<String> partitionReplicas = replicationProperties.getNodeReplicasIds(partition.getNodeId());
-                //find a replica that is still active
-                for (String replica : partitionReplicas) {
-                    //TODO (mhubail) currently this assigns the partition to the first found active replica.
-                    //It needs to be modified to consider load balancing.
-                    if (addActiveReplica(replica, partition, partitionRecoveryPlan)) {
-                        break;
-                    }
-                }
-            }
-
-            if (partitionRecoveryPlan.size() == 0) {
-                //no active replicas were found for the failed node
-                LOGGER.severe("Could not find active replicas for the partitions " + lostPartitions);
-                return;
-            } else {
-                LOGGER.info("Partitions to recover: " + lostPartitions);
-            }
-            ICCMessageBroker messageBroker = (ICCMessageBroker) AppContextInfo.INSTANCE.getCCApplicationContext()
-                    .getMessageBroker();
-            //For each replica, send a request to takeover the assigned partitions
-            for (Entry<String, List<Integer>> entry : partitionRecoveryPlan.entrySet()) {
-                String replica = entry.getKey();
-                Integer[] partitionsToTakeover = entry.getValue().toArray(new Integer[entry.getValue().size()]);
-                long requestId = clusterRequestId++;
-                TakeoverPartitionsRequestMessage takeoverRequest = new TakeoverPartitionsRequestMessage(requestId,
-                        replica, partitionsToTakeover);
-                pendingTakeoverRequests.put(requestId, takeoverRequest);
-                try {
-                    messageBroker.sendApplicationMessageToNC(takeoverRequest, replica);
-                } catch (Exception e) {
-                    /**
-                     * if we fail to send the request, it means the NC we tried to send the request to
-                     * has failed. When the failure notification arrives, we will send any pending request
-                     * that belongs to the failed NC to a different active replica.
-                     */
-                    LOGGER.log(Level.WARNING, "Failed to send takeover request: " + takeoverRequest, e);
-                }
-            }
-        }
-    }
-
-    private boolean addActiveReplica(String replica, ClusterPartition partition,
-            Map<String, List<Integer>> partitionRecoveryPlan) {
-        if (activeNcConfiguration.containsKey(replica) && !failedNodes.contains(replica)) {
-            if (!partitionRecoveryPlan.containsKey(replica)) {
-                List<Integer> replicaPartitions = new ArrayList<>();
-                replicaPartitions.add(partition.getPartitionId());
-                partitionRecoveryPlan.put(replica, replicaPartitions);
-            } else {
-                partitionRecoveryPlan.get(replica).add(partition.getPartitionId());
-            }
-            return true;
-        }
-        return false;
-    }
-
-    private synchronized List<ClusterPartition> getNodeAssignedPartitions(String nodeId) {
-        List<ClusterPartition> nodePartitions = new ArrayList<>();
-        for (ClusterPartition partition : clusterPartitions.values()) {
-            if (partition.getActiveNodeId().equals(nodeId)) {
-                nodePartitions.add(partition);
-            }
-        }
-        /**
-         * if there is any pending takeover request that this node was supposed to handle,
-         * it needs to be sent to a different replica
-         */
-        List<Long> failedTakeoverRequests = new ArrayList<>();
-        for (TakeoverPartitionsRequestMessage request : pendingTakeoverRequests.values()) {
-            if (request.getNodeId().equals(nodeId)) {
-                for (Integer partitionId : request.getPartitions()) {
-                    nodePartitions.add(clusterPartitions.get(partitionId));
-                }
-                failedTakeoverRequests.add(request.getRequestId());
-            }
-        }
-
-        //remove failed requests
-        for (Long requestId : failedTakeoverRequests) {
-            pendingTakeoverRequests.remove(requestId);
-        }
-        return nodePartitions;
-    }
-
-    private synchronized void requestMetadataNodeTakeover() {
-        //need a new node to takeover metadata node
-        ClusterPartition metadataPartiton = AppContextInfo.INSTANCE.getMetadataProperties()
-                .getMetadataPartition();
-        //request the metadataPartition node to register itself as the metadata node
-        TakeoverMetadataNodeRequestMessage takeoverRequest = new TakeoverMetadataNodeRequestMessage();
-        ICCMessageBroker messageBroker = (ICCMessageBroker) AppContextInfo.INSTANCE.getCCApplicationContext()
-                .getMessageBroker();
-        try {
-            messageBroker.sendApplicationMessageToNC(takeoverRequest, metadataPartiton.getActiveNodeId());
-        } catch (Exception e) {
-            /**
-             * if we fail to send the request, it means the NC we tried to send the request to
-             * has failed. When the failure notification arrives, a new NC will be assigned to
-             * the metadata partition and a new metadata node takeover request will be sent to it.
-             */
-            LOGGER.log(Level.WARNING,
-                    "Failed to send metadata node takeover request to: " + metadataPartiton.getActiveNodeId(), e);
-        }
-    }
-
-    public synchronized void processPartitionTakeoverResponse(TakeoverPartitionsResponseMessage response)
-            throws HyracksDataException {
-        for (Integer partitonId : response.getPartitions()) {
-            ClusterPartition partition = clusterPartitions.get(partitonId);
-            partition.setActive(true);
-            partition.setActiveNodeId(response.getNodeId());
-        }
-        pendingTakeoverRequests.remove(response.getRequestId());
-        resetClusterPartitionConstraint();
-        updateClusterState();
-    }
-
-    public synchronized void processMetadataNodeTakeoverResponse(TakeoverMetadataNodeResponseMessage response)
-            throws HyracksDataException {
-        currentMetadataNode = response.getNodeId();
-        metadataNodeActive = true;
-        LOGGER.info("Current metadata node: " + currentMetadataNode);
-        updateClusterState();
-    }
-
-    private synchronized void prepareFailbackPlan(String failingBackNodeId) {
-        NodeFailbackPlan plan = NodeFailbackPlan.createPlan(failingBackNodeId);
-        pendingProcessingFailbackPlans.add(plan);
-        planId2FailbackPlanMap.put(plan.getPlanId(), plan);
-
-        //get all partitions this node requires to resync
-        ReplicationProperties replicationProperties = AppContextInfo.INSTANCE.getReplicationProperties();
-        Set<String> nodeReplicas = replicationProperties.getNodeReplicationClients(failingBackNodeId);
-        for (String replicaId : nodeReplicas) {
-            ClusterPartition[] nodePartitions = node2PartitionsMap.get(replicaId);
-            for (ClusterPartition partition : nodePartitions) {
-                plan.addParticipant(partition.getActiveNodeId());
-                /**
-                 * if the partition original node is the returning node,
-                 * add it to the list of the partitions which will be failed back
-                 */
-                if (partition.getNodeId().equals(failingBackNodeId)) {
-                    plan.addPartitionToFailback(partition.getPartitionId(), partition.getActiveNodeId());
-                }
-            }
-        }
-
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Prepared Failback plan: " + plan.toString());
-        }
-
-        processPendingFailbackPlans();
-    }
-
-    private synchronized void processPendingFailbackPlans() {
-        /**
-         * if the cluster state is not ACTIVE, then failbacks should not be processed
-         * since some partitions are not active
-         */
-        if (state == ClusterState.ACTIVE) {
-            while (!pendingProcessingFailbackPlans.isEmpty()) {
-                //take the first pending failback plan
-                NodeFailbackPlan plan = pendingProcessingFailbackPlans.pop();
-                /**
-                 * A plan at this stage will be in one of two states:
-                 * 1. PREPARING -> the participants were selected but we haven't sent any request.
-                 * 2. PENDING_ROLLBACK -> a participant failed before we send any requests
-                 */
-                if (plan.getState() == FailbackPlanState.PREPARING) {
-                    //set the partitions that will be failed back as inactive
-                    String failbackNode = plan.getNodeId();
-                    for (Integer partitionId : plan.getPartitionsToFailback()) {
-                        ClusterPartition clusterPartition = clusterPartitions.get(partitionId);
-                        clusterPartition.setActive(false);
-                        //partition expected to be returned to the failing back node
-                        clusterPartition.setActiveNodeId(failbackNode);
-                    }
-
-                    /**
-                     * if the returning node is the original metadata node,
-                     * then metadata node will change after the failback completes
-                     */
-                    String originalMetadataNode = AppContextInfo.INSTANCE.getMetadataProperties()
-                            .getMetadataNodeName();
-                    if (originalMetadataNode.equals(failbackNode)) {
-                        plan.setNodeToReleaseMetadataManager(currentMetadataNode);
-                        currentMetadataNode = "";
-                        metadataNodeActive = false;
-                    }
-
-                    //force new jobs to wait
-                    state = ClusterState.REBALANCING;
-                    ICCMessageBroker messageBroker = (ICCMessageBroker) AppContextInfo.INSTANCE
-                            .getCCApplicationContext().getMessageBroker();
-                    handleFailbackRequests(plan, messageBroker);
-                    /**
-                     * wait until the current plan is completed before processing the next plan.
-                     * when the current one completes or is reverted, the cluster state will be
-                     * ACTIVE again, and the next failback plan (if any) will be processed.
-                     */
-                    break;
-                } else if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
-                    //this plan failed before sending any requests -> nothing to rollback
-                    planId2FailbackPlanMap.remove(plan.getPlanId());
-                }
-            }
-        }
-    }
-
-    private void handleFailbackRequests(NodeFailbackPlan plan, ICCMessageBroker messageBroker) {
-        //send requests to other nodes to complete on-going jobs and prepare partitions for failback
-        for (PreparePartitionsFailbackRequestMessage request : plan.getPlanFailbackRequests()) {
-            try {
-                messageBroker.sendApplicationMessageToNC(request, request.getNodeID());
-                plan.addPendingRequest(request);
-            } catch (Exception e) {
-                LOGGER.log(Level.WARNING, "Failed to send failback request to: " + request.getNodeID(), e);
-                plan.notifyNodeFailure(request.getNodeID());
-                revertFailedFailbackPlanEffects();
-                break;
-            }
-        }
-    }
-
-    public synchronized void processPreparePartitionsFailbackResponse(PreparePartitionsFailbackResponseMessage msg) {
-        NodeFailbackPlan plan = planId2FailbackPlanMap.get(msg.getPlanId());
-        plan.markRequestCompleted(msg.getRequestId());
-        /**
-         * A plan at this stage will be in one of three states:
-         * 1. PENDING_PARTICIPANT_REPONSE -> one or more responses are still expected (wait).
-         * 2. PENDING_COMPLETION -> all responses received (time to send completion request).
-         * 3. PENDING_ROLLBACK -> the plan failed and we just received the final pending response (revert).
-         */
-        if (plan.getState() == FailbackPlanState.PENDING_COMPLETION) {
-            CompleteFailbackRequestMessage request = plan.getCompleteFailbackRequestMessage();
-
-            //send complete resync and takeover partitions to the failing back node
-            ICCMessageBroker messageBroker = (ICCMessageBroker) AppContextInfo.INSTANCE.getCCApplicationContext()
-                    .getMessageBroker();
-            try {
-                messageBroker.sendApplicationMessageToNC(request, request.getNodeId());
-            } catch (Exception e) {
-                LOGGER.log(Level.WARNING, "Failed to send complete failback request to: " + request.getNodeId(), e);
-                notifyFailbackPlansNodeFailure(request.getNodeId());
-                revertFailedFailbackPlanEffects();
-            }
-        } else if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
-            revertFailedFailbackPlanEffects();
-        }
-    }
-
-    public synchronized void processCompleteFailbackResponse(CompleteFailbackResponseMessage response)
-            throws HyracksDataException {
-        /**
-         * the failback plan completed successfully:
-         * Remove all references to it.
-         * Remove the the failing back node from the failed nodes list.
-         * Notify its replicas to reconnect to it.
-         * Set the failing back node partitions as active.
-         */
-        NodeFailbackPlan plan = planId2FailbackPlanMap.remove(response.getPlanId());
-        String nodeId = plan.getNodeId();
-        failedNodes.remove(nodeId);
-        //notify impacted replicas they can reconnect to this node
-        notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN);
-        updateNodePartitions(nodeId, true);
-    }
-
-    private synchronized void notifyImpactedReplicas(String nodeId, ClusterEventType event) {
-        ReplicationProperties replicationProperties = AppContextInfo.INSTANCE.getReplicationProperties();
-        Set<String> remoteReplicas = replicationProperties.getRemoteReplicasIds(nodeId);
-        String nodeIdAddress = "";
-        //in case the node joined with a new IP address, we need to send it to the other replicas
-        if (event == ClusterEventType.NODE_JOIN) {
-            nodeIdAddress = activeNcConfiguration.get(nodeId).get(CLUSTER_NET_IP_ADDRESS_KEY);
-        }
-
-        ReplicaEventMessage msg = new ReplicaEventMessage(nodeId, nodeIdAddress, event);
-        ICCMessageBroker messageBroker = (ICCMessageBroker) AppContextInfo.INSTANCE.getCCApplicationContext()
-                .getMessageBroker();
-        for (String replica : remoteReplicas) {
-            //if the remote replica is alive, send the event
-            if (activeNcConfiguration.containsKey(replica)) {
-                try {
-                    messageBroker.sendApplicationMessageToNC(msg, replica);
-                } catch (Exception e) {
-                    LOGGER.log(Level.WARNING, "Failed sending an application message to an NC", e);
-                }
-            }
-        }
-    }
-
-    private synchronized void revertFailedFailbackPlanEffects() {
-        Iterator<NodeFailbackPlan> iterator = planId2FailbackPlanMap.values().iterator();
-        while (iterator.hasNext()) {
-            NodeFailbackPlan plan = iterator.next();
-            if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
-                //TODO if the failing back node is still active, notify it to construct a new plan for it
-                iterator.remove();
-
-                //reassign the partitions that were supposed to be failed back to an active replica
-                requestPartitionsTakeover(plan.getNodeId());
-            }
-        }
-    }
-
-    private synchronized void notifyFailbackPlansNodeFailure(String nodeId) {
-        Iterator<NodeFailbackPlan> iterator = planId2FailbackPlanMap.values().iterator();
-        while (iterator.hasNext()) {
-            NodeFailbackPlan plan = iterator.next();
-            plan.notifyNodeFailure(nodeId);
-        }
     }
 
     public synchronized boolean isMetadataNodeActive() {
@@ -654,9 +282,7 @@
                 }
             }
             nodeJSON.put("state", failedNodes.contains(entry.getKey()) ? "FAILED"
-                    : allActive ? "ACTIVE"
-                    : anyActive ? "PARTIALLY_ACTIVE"
-                    : "INACTIVE");
+                    : allActive ? "ACTIVE" : anyActive ? "PARTIALLY_ACTIVE" : "INACTIVE");
             nodeJSON.put("partitions", partitions);
             stateDescription.accumulate("ncs", nodeJSON);
         }
@@ -670,4 +296,14 @@
         stateDescription.put("partitions", clusterPartitions);
         return stateDescription;
     }
+
+    @Override
+    public Map<String, Map<String, String>> getActiveNcConfiguration() {
+        return Collections.unmodifiableMap(activeNcConfiguration);
+    }
+
+    @Override
+    public String getCurrentMetadataNodeId() {
+        return currentMetadataNode;
+    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/FaultToleranceManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/FaultToleranceManager.java
new file mode 100644
index 0000000..83f4a26
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/FaultToleranceManager.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.util;
+
+import org.apache.asterix.common.config.ClusterProperties;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.runtime.message.CompleteFailbackResponseMessage;
+import org.apache.asterix.runtime.message.NCLifecycleTaskReportMessage;
+import org.apache.asterix.runtime.message.PreparePartitionsFailbackResponseMessage;
+import org.apache.asterix.runtime.message.ReplayPartitionLogsResponseMessage;
+import org.apache.asterix.runtime.message.StartupTaskRequestMessage;
+import org.apache.asterix.runtime.message.TakeoverMetadataNodeResponseMessage;
+import org.apache.asterix.runtime.message.TakeoverPartitionsResponseMessage;
+import org.apache.asterix.runtime.replication.FaultToleranceStrategyFactory;
+import org.apache.asterix.runtime.replication.IFaultToleranceStrategy;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * A singleton class that lives on the CC process and is responsible for
+ * receiving and passing messages to a {@link IFaultToleranceStrategy}.
+ */
+public class FaultToleranceManager {
+
+    public static final FaultToleranceManager INSTANCE = new FaultToleranceManager();
+
+    private final IFaultToleranceStrategy ftStrategy;
+    private final IReplicationStrategy replicationStrategy;
+    private final ICCMessageBroker messageBroker;
+
+    private FaultToleranceManager() {
+        replicationStrategy = AppContextInfo.INSTANCE.getReplicationProperties().getReplicationStrategy();
+        messageBroker = (ICCMessageBroker) AppContextInfo.INSTANCE.getCCApplicationContext().getMessageBroker();
+        ftStrategy = FaultToleranceStrategyFactory.create(ClusterProperties.INSTANCE.getCluster(), replicationStrategy,
+                ClusterStateManager.INSTANCE, messageBroker);
+    }
+
+    public synchronized void notifyNodeJoin(String nodeId) throws HyracksDataException {
+        ftStrategy.notifyNodeJoin(nodeId);
+    }
+
+    public synchronized void notifyNodeFailure(String nodeId) throws HyracksDataException {
+        ftStrategy.notifyNodeFailure(nodeId);
+    }
+
+    public synchronized void processCompleteFailbackResponse(CompleteFailbackResponseMessage msg)
+            throws HyracksDataException {
+        ftStrategy.process(msg);
+    }
+
+    public synchronized void processPreparePartitionsFailbackResponse(PreparePartitionsFailbackResponseMessage msg)
+            throws HyracksDataException {
+        ftStrategy.process(msg);
+    }
+
+    public synchronized void processMetadataNodeTakeoverResponse(TakeoverMetadataNodeResponseMessage msg)
+            throws HyracksDataException {
+        ftStrategy.process(msg);
+    }
+
+    public synchronized void processPartitionTakeoverResponse(TakeoverPartitionsResponseMessage msg)
+            throws HyracksDataException {
+        ftStrategy.process(msg);
+    }
+
+    public synchronized void processReplayPartitionLogsResponse(ReplayPartitionLogsResponseMessage msg)
+            throws HyracksDataException {
+        ftStrategy.process(msg);
+    }
+
+    public synchronized void processStartupTaskRequest(StartupTaskRequestMessage msg) throws HyracksDataException {
+        ftStrategy.process(msg);
+    }
+
+    public synchronized void processNCLifecycleTaskReport(NCLifecycleTaskReportMessage msg)
+            throws HyracksDataException {
+        ftStrategy.process(msg);
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index d5b2563..47773b4 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -43,8 +43,10 @@
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.MetadataProperties;
 import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.replication.ReplicationJob;
 import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.replication.ReplicationJob;
+import org.apache.asterix.common.storage.IndexFileProperties;
+import org.apache.asterix.common.transactions.Resource;
 import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.commons.io.FileUtils;
@@ -483,20 +485,27 @@
         nodeActivePartitions.remove(partitonId);
     }
 
-    /**
-     * @param resourceAbsolutePath
-     * @return the resource relative path starting from the partition directory
-     */
-    public static String getResourceRelativePath(String resourceAbsolutePath) {
-        String[] tokens = resourceAbsolutePath.split(File.separator);
-        //partition/dataverse/idx/fileName
-        return tokens[tokens.length - 4] + File.separator + tokens[tokens.length - 3] + File.separator
-                + tokens[tokens.length - 2] + File.separator + tokens[tokens.length - 1];
+    private static String getLocalResourceRelativePath(String absolutePath) {
+        final String[] tokens = absolutePath.split(File.separator);
+        // Format: storage_dir/partition/dataverse/idx
+        return tokens[tokens.length - 5] + File.separator + tokens[tokens.length - 4] + File.separator
+                + tokens[tokens.length - 3] + File.separator + tokens[tokens.length - 2];
     }
 
-    public static int getResourcePartition(String resourceAbsolutePath) {
-        String[] tokens = resourceAbsolutePath.split(File.separator);
-        //partition/dataverse/idx/fileName
-        return StoragePathUtil.getPartitionNumFromName(tokens[tokens.length - 4]);
+    public IndexFileProperties getIndexFileRef(String absoluteFilePath) throws HyracksDataException {
+        //TODO pass relative path
+        final String[] tokens = absoluteFilePath.split(File.separator);
+        if (tokens.length < 5) {
+            throw new HyracksDataException("Invalid file format");
+        }
+        String fileName = tokens[tokens.length - 1];
+        String index = tokens[tokens.length - 2];
+        String dataverse = tokens[tokens.length - 3];
+        String partition = tokens[tokens.length - 4];
+        int partitionId = StoragePathUtil.getPartitionNumFromName(partition);
+        String relativePath = getLocalResourceRelativePath(absoluteFilePath);
+        final LocalResource lr = get(relativePath);
+        int datasetId = lr == null ? -1 : ((Resource) lr.getResource()).datasetId();
+        return new IndexFileProperties(partitionId, dataverse, index, fileName, datasetId);
     }
 }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
index c81225f..fd64b8e 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
@@ -18,8 +18,12 @@
  */
 package org.apache.asterix.transaction.management.service.logging;
 
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.transactions.ILogRecord;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
@@ -30,15 +34,37 @@
 public class LogManagerWithReplication extends LogManager {
 
     private IReplicationManager replicationManager;
+    private final IReplicationStrategy replicationStrategy;
+    private final Set<Integer> replicatedJob = ConcurrentHashMap.newKeySet();
 
-    public LogManagerWithReplication(TransactionSubsystem txnSubsystem) {
+    public LogManagerWithReplication(TransactionSubsystem txnSubsystem, IReplicationStrategy replicationStrategy) {
         super(txnSubsystem);
+        this.replicationStrategy = replicationStrategy;
     }
 
     @Override
     public void log(ILogRecord logRecord) throws ACIDException {
-        //only locally generated logs should be replicated
-        logRecord.setReplicated(logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.WAIT);
+        boolean shouldReplicate = logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.WAIT;
+        if (shouldReplicate) {
+            switch (logRecord.getLogType()) {
+                case LogType.ENTITY_COMMIT:
+                case LogType.UPSERT_ENTITY_COMMIT:
+                case LogType.UPDATE:
+                case LogType.FLUSH:
+                    shouldReplicate = replicationStrategy.isMatch(logRecord.getDatasetId());
+                    if (shouldReplicate && !replicatedJob.contains(logRecord.getJobId())) {
+                        replicatedJob.add(logRecord.getJobId());
+                    }
+                    break;
+                case LogType.JOB_COMMIT:
+                case LogType.ABORT:
+                    shouldReplicate = replicatedJob.remove(logRecord.getJobId());
+                    break;
+                default:
+                    shouldReplicate = false;
+            }
+        }
+        logRecord.setReplicated(shouldReplicate);
 
         //Remote flush logs do not need to be flushed separately since they may not trigger local flush
         if (logRecord.getLogType() == LogType.FLUSH && logRecord.getLogSource() == LogSource.LOCAL) {
@@ -74,7 +100,8 @@
                     }
 
                     //wait for job Commit/Abort ACK from replicas
-                    if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
+                    if (logRecord.isReplicated() && (logRecord.getLogType() == LogType.JOB_COMMIT
+                            || logRecord.getLogType() == LogType.ABORT)) {
                         while (!replicationManager.hasBeenReplicated(logRecord)) {
                             try {
                                 logRecord.wait();
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
index f8b6384..15dad9f 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -43,7 +43,8 @@
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
-import org.apache.asterix.common.config.ClusterProperties;
+import org.apache.asterix.common.config.IPropertiesProvider;
+import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import org.apache.asterix.common.replication.IReplicaResourcesManager;
@@ -92,7 +93,9 @@
     public RecoveryManager(TransactionSubsystem txnSubsystem) {
         this.txnSubsystem = txnSubsystem;
         logMgr = (LogManager) txnSubsystem.getLogManager();
-        replicationEnabled = ClusterProperties.INSTANCE.isReplicationEnabled();
+        ReplicationProperties repProperties = ((IPropertiesProvider) txnSubsystem.getAsterixAppRuntimeContextProvider()
+                .getAppContext()).getReplicationProperties();
+        replicationEnabled = repProperties.isParticipant(txnSubsystem.getId());
         localResourceRepository = (PersistentLocalResourceRepository) txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getLocalResourceRepository();
         cachedEntityCommitsPerJobSize = txnSubsystem.getTransactionProperties().getJobRecoveryMemorySize();
@@ -109,6 +112,10 @@
      */
     @Override
     public SystemState getSystemState() throws ACIDException {
+        // TODO we need to know initial run vs NEW_UNIVERSE
+        //        if (txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().isInitialRun()) {
+        //            return SystemState.INITIAL_RUN;
+        //        }
         //read checkpoint file
         Checkpoint checkpointObject = checkpointManager.getLatest();
         if (checkpointObject == null) {
@@ -125,15 +132,12 @@
             if (checkpointObject.getMinMCTFirstLsn() == AbstractCheckpointManager.SHARP_CHECKPOINT_LSN) {
                 //no logs exist
                 state = SystemState.HEALTHY;
-                return state;
             } else if (checkpointObject.getCheckpointLsn() == logMgr.getAppendLSN() && checkpointObject.isSharp()) {
                 //only remote logs exist
                 state = SystemState.HEALTHY;
-                return state;
             } else {
                 //need to perform remote recovery
                 state = SystemState.CORRUPTED;
-                return state;
             }
         } else {
             long readableSmallestLSN = logMgr.getReadableSmallestLSN();
@@ -143,16 +147,14 @@
                     //No choice but continuing when the log files are lost.
                 }
                 state = SystemState.HEALTHY;
-                return state;
             } else if (checkpointObject.getCheckpointLsn() == logMgr.getAppendLSN()
                     && checkpointObject.getMinMCTFirstLsn() == AbstractCheckpointManager.SHARP_CHECKPOINT_LSN) {
                 state = SystemState.HEALTHY;
-                return state;
             } else {
                 state = SystemState.CORRUPTED;
-                return state;
             }
         }
+        return state;
     }
 
     //This method is used only when replication is disabled.
@@ -177,6 +179,25 @@
     }
 
     @Override
+    public void startLocalRecovery(Set<Integer> partitions) throws IOException, ACIDException {
+        state = SystemState.RECOVERING;
+        LOGGER.log(Level.INFO, "starting recovery ...");
+
+        long readableSmallestLSN = logMgr.getReadableSmallestLSN();
+        Checkpoint checkpointObject = checkpointManager.getLatest();
+        long lowWaterMarkLSN = checkpointObject.getMinMCTFirstLsn();
+        if (lowWaterMarkLSN < readableSmallestLSN) {
+            lowWaterMarkLSN = readableSmallestLSN;
+        }
+
+        //delete any recovery files from previous failed recovery attempts
+        deleteRecoveryTemporaryFiles();
+
+        //get active partitions on this node
+        replayPartitionsLogs(partitions, logMgr.getLogReader(true), lowWaterMarkLSN);
+    }
+
+    @Override
     public synchronized void replayPartitionsLogs(Set<Integer> partitions, ILogReader logReader, long lowWaterMarkLSN)
             throws IOException, ACIDException {
         try {
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
index 09183fe..6a59a9e 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
@@ -22,11 +22,11 @@
 import java.util.concurrent.Future;
 import java.util.logging.Logger;
 
-import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.config.IPropertiesProvider;
 import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.config.TransactionProperties;
 import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.transactions.Checkpoint;
 import org.apache.asterix.common.transactions.CheckpointProperties;
 import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
@@ -70,7 +70,11 @@
         this.txnProperties = txnProperties;
         this.transactionManager = new TransactionManager(this);
         this.lockManager = new ConcurrentLockManager(txnProperties.getLockManagerShrinkTimer());
-        final boolean replicationEnabled = ClusterProperties.INSTANCE.isReplicationEnabled();
+        ReplicationProperties repProperties = ((IPropertiesProvider) asterixAppRuntimeContextProvider
+                .getAppContext()).getReplicationProperties();
+        IReplicationStrategy replicationStrategy = repProperties.getReplicationStrategy();
+        final boolean replicationEnabled = repProperties.isParticipant(id);
+
         final CheckpointProperties checkpointProperties = new CheckpointProperties(txnProperties, id);
         checkpointManager = CheckpointManagerFactory.create(this, checkpointProperties, replicationEnabled);
         final Checkpoint latestCheckpoint = checkpointManager.getLatest();
@@ -80,14 +84,8 @@
                             latestCheckpoint.getStorageVersion(), StorageConstants.VERSION));
         }
 
-        ReplicationProperties asterixReplicationProperties = null;
-        if (asterixAppRuntimeContextProvider != null) {
-            asterixReplicationProperties = ((IPropertiesProvider) asterixAppRuntimeContextProvider
-                    .getAppContext()).getReplicationProperties();
-        }
-
-        if (asterixReplicationProperties != null && replicationEnabled) {
-            this.logManager = new LogManagerWithReplication(this);
+        if (replicationEnabled) {
+            this.logManager = new LogManagerWithReplication(this, replicationStrategy);
         } else {
             this.logManager = new LogManager(this);
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
index 4126c19..0094fa3 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
@@ -350,7 +350,7 @@
     public long getLSNOffset() throws HyracksDataException {
         int metadataPageNum = getMetadataPageId();
         if (metadataPageNum != IBufferCache.INVALID_PAGEID) {
-            return ((long) metadataPageNum * bufferCache.getPageSizeWithHeader()) + LIFOMetaDataFrame.LSN_OFFSET;
+            return ((long) metadataPageNum * bufferCache.getPageSize()) + LIFOMetaDataFrame.LSN_OFFSET;
         }
         return IMetadataPageManager.Constants.INVALID_LSN_OFFSET;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
index 6cedb4d..85f6451 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
@@ -366,7 +366,8 @@
     public long getLSNOffset() throws HyracksDataException {
         int metadataPageNum = getMetadataPageId();
         if (metadataPageNum != IBufferCache.INVALID_PAGEID) {
-            return ((long) metadataPageNum * bufferCache.getPageSizeWithHeader()) + LIFOMetaDataFrame.LSN_OFFSET;
+            //TODO handle case for large pages
+            return ((long) metadataPageNum * bufferCache.getPageSize()) + LIFOMetaDataFrame.LSN_OFFSET;
         }
         return IMetadataPageManager.Constants.INVALID_LSN_OFFSET;
     }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1414
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I074c48bb1994c9d2be8706e9a2ac1eab97756fc2
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <hubailmor@gmail.com>


Mime
View raw message