asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [4/6] asterixdb git commit: Introduce Strategy Based Replication and Fault-Tolerance
Date Sun, 19 Feb 2017 20:30:30 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index bc270df..ce57648 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -18,27 +18,17 @@
  */
 package org.apache.asterix.hyracks.bootstrap;
 
-import java.io.File;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.app.external.ExternalLibraryUtils;
 import org.apache.asterix.app.nc.NCAppRuntimeContext;
+import org.apache.asterix.app.replication.message.StartupTaskRequestMessage;
 import org.apache.asterix.common.api.AsterixThreadFactory;
 import org.apache.asterix.common.api.IAppRuntimeContext;
 import org.apache.asterix.common.config.AsterixExtension;
-import org.apache.asterix.common.config.MetadataProperties;
-import org.apache.asterix.common.config.StorageProperties;
-import org.apache.asterix.common.config.TransactionProperties;
 import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.config.IPropertiesProvider;
 import org.apache.asterix.common.config.MessagingProperties;
-import org.apache.asterix.common.replication.IRemoteRecoveryManager;
-import org.apache.asterix.common.transactions.ICheckpointManager;
+import org.apache.asterix.common.config.MetadataProperties;
+import org.apache.asterix.common.config.StorageProperties;
+import org.apache.asterix.common.config.TransactionProperties;
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
 import org.apache.asterix.common.utils.PrintUtil;
@@ -47,39 +37,39 @@ import org.apache.asterix.event.schema.cluster.Cluster;
 import org.apache.asterix.event.schema.cluster.Node;
 import org.apache.asterix.messaging.MessagingChannelInterfaceFactory;
 import org.apache.asterix.messaging.NCMessageBroker;
-import org.apache.asterix.runtime.message.ReportMaxResourceIdMessage;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.application.INCApplicationContext;
 import org.apache.hyracks.api.application.INCApplicationEntryPoint;
 import org.apache.hyracks.api.job.resource.NodeCapacity;
-import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
-import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
 import org.apache.hyracks.api.messages.IMessageBroker;
 import org.apache.hyracks.control.nc.NodeControllerService;
 import org.kohsuke.args4j.CmdLineException;
 import org.kohsuke.args4j.CmdLineParser;
 import org.kohsuke.args4j.Option;
 
+import java.io.File;
+import java.util.Collections;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
 public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
     private static final Logger LOGGER = Logger.getLogger(NCApplicationEntryPoint.class.getName());
 
-    @Option(name = "-initial-run",
-            usage = "A flag indicating if it's the first time the NC is started (default: false)", required = false)
+    @Option(name = "-initial-run", usage = "A flag indicating if it's the first time the NC is started "
+            + "(default: false)", required = false)
     public boolean initialRun = false;
 
-    @Option(name = "-virtual-NC",
-            usage = "A flag indicating if this NC is running on virtual cluster " + "(default: false)",
-            required = false)
+    @Option(name = "-virtual-NC", usage = "A flag indicating if this NC is running on virtual cluster "
+            + "(default: false)", required = false)
     public boolean virtualNC = false;
 
     private INCApplicationContext ncApplicationContext = null;
     private IAppRuntimeContext runtimeContext;
     private String nodeId;
-    private boolean isMetadataNode = false;
     private boolean stopInitiated = false;
-    private SystemState systemState = SystemState.NEW_UNIVERSE;
-    private boolean pendingFailbackCompletion = false;
+    private SystemState systemState;
     private IMessageBroker messageBroker;
 
     @Override
@@ -93,8 +83,8 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
             parser.printUsage(System.err);
             throw e;
         }
-        ncAppCtx.setThreadFactory(new AsterixThreadFactory(ncAppCtx.getThreadFactory(),
-                ncAppCtx.getLifeCycleComponentManager()));
+        ncAppCtx.setThreadFactory(
+                new AsterixThreadFactory(ncAppCtx.getThreadFactory(), ncAppCtx.getLifeCycleComponentManager()));
         ncApplicationContext = ncAppCtx;
         nodeId = ncApplicationContext.getNodeId();
         if (LOGGER.isLoggable(Level.INFO)) {
@@ -104,12 +94,11 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
         final NodeControllerService controllerService = (NodeControllerService) ncAppCtx.getControllerService();
 
         if (System.getProperty("java.rmi.server.hostname") == null) {
-            System.setProperty("java.rmi.server.hostname", (controllerService)
-                    .getConfiguration().clusterNetPublicIPAddress);
+            System.setProperty("java.rmi.server.hostname",
+                    (controllerService).getConfiguration().clusterNetPublicIPAddress);
         }
         runtimeContext = new NCAppRuntimeContext(ncApplicationContext, getExtensions());
-        MetadataProperties metadataProperties = ((IPropertiesProvider) runtimeContext)
-                .getMetadataProperties();
+        MetadataProperties metadataProperties = ((IPropertiesProvider) runtimeContext).getMetadataProperties();
         if (!metadataProperties.getNodeNames().contains(ncApplicationContext.getNodeId())) {
             if (LOGGER.isLoggable(Level.INFO)) {
                 LOGGER.info("Substitute node joining : " + ncApplicationContext.getNodeId());
@@ -118,69 +107,35 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
         }
         runtimeContext.initialize(initialRun);
         ncApplicationContext.setApplicationObject(runtimeContext);
-        MessagingProperties messagingProperties = ((IPropertiesProvider) runtimeContext)
-                .getMessagingProperties();
+        MessagingProperties messagingProperties = ((IPropertiesProvider) runtimeContext).getMessagingProperties();
         messageBroker = new NCMessageBroker(controllerService, messagingProperties);
         ncApplicationContext.setMessageBroker(messageBroker);
         MessagingChannelInterfaceFactory interfaceFactory = new MessagingChannelInterfaceFactory(
                 (NCMessageBroker) messageBroker, messagingProperties);
         ncApplicationContext.setMessagingChannelInterfaceFactory(interfaceFactory);
 
-        boolean replicationEnabled = ClusterProperties.INSTANCE.isReplicationEnabled();
-        boolean autoFailover = ClusterProperties.INSTANCE.isAutoFailoverEnabled();
-        if (initialRun) {
-            LOGGER.info("System is being initialized. (first run)");
-        } else {
-            IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
-            systemState = recoveryMgr.getSystemState();
+        IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
+        systemState = recoveryMgr.getSystemState();
 
+        if (systemState == SystemState.NEW_UNIVERSE) {
             if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("System is in a state: " + systemState);
-            }
-
-            //do not attempt to perform remote recovery if this is a virtual NC
-            if (autoFailover && !virtualNC) {
-                if (systemState == SystemState.NEW_UNIVERSE || systemState == SystemState.CORRUPTED) {
-                    //Start failback process
-                    IRemoteRecoveryManager remoteRecoveryMgr = runtimeContext.getRemoteRecoveryManager();
-                    remoteRecoveryMgr.startFailbackProcess();
-                    systemState = SystemState.RECOVERING;
-                    pendingFailbackCompletion = true;
-                }
-            } else {
-                //recover if the system is corrupted by checking system state.
-                if (systemState == SystemState.CORRUPTED) {
-                    recoveryMgr.startRecovery(true);
-                }
+                LOGGER.info("System state: " + SystemState.NEW_UNIVERSE);
+                LOGGER.info("Node ID: " + nodeId);
+                LOGGER.info("Stores: " + PrintUtil.toString(metadataProperties.getStores()));
+                LOGGER.info("Root Metadata Store: " + metadataProperties.getStores().get(nodeId)[0]);
             }
+            PersistentLocalResourceRepository localResourceRepository =
+                    (PersistentLocalResourceRepository) runtimeContext.getLocalResourceRepository();
+            localResourceRepository.initializeNewUniverse(ClusterProperties.INSTANCE.getStorageDirectoryName());
         }
 
-        /**
-         * if the node pending failback completion, the replication channel
-         * should not be opened to avoid other nodes connecting to it before
-         * the node completes its failback. CC will notify other replicas once
-         * this node is ready to receive replication requests.
-         */
-        if (replicationEnabled && !pendingFailbackCompletion) {
-            startReplicationService();
-        }
+        performLocalCleanUp();
     }
 
     protected List<AsterixExtension> getExtensions() {
         return Collections.emptyList();
     }
 
-    private void startReplicationService() throws InterruptedException {
-        //Open replication channel
-        runtimeContext.getReplicationChannel().start();
-
-        //Check the state of remote replicas
-        runtimeContext.getReplicationManager().initializeReplicasState();
-
-        //Start replication after the state of remote replicas has been initialized.
-        runtimeContext.getReplicationManager().startReplicationThreads();
-    }
-
     @Override
     public void stop() throws Exception {
         if (!stopInitiated) {
@@ -204,63 +159,13 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
 
     @Override
     public void notifyStartupComplete() throws Exception {
-        //Send max resource id on this NC to the CC
-        ReportMaxResourceIdMessage.send((NodeControllerService) ncApplicationContext.getControllerService());
-        MetadataProperties metadataProperties = ((IPropertiesProvider) runtimeContext)
-                .getMetadataProperties();
-        if (initialRun || systemState == SystemState.NEW_UNIVERSE) {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("System state: " + SystemState.NEW_UNIVERSE);
-                LOGGER.info("Node ID: " + nodeId);
-                LOGGER.info("Stores: " + PrintUtil.toString(metadataProperties.getStores()));
-                LOGGER.info("Root Metadata Store: " + metadataProperties.getStores().get(nodeId)[0]);
-            }
-
-            PersistentLocalResourceRepository localResourceRepository =
-                    (PersistentLocalResourceRepository) runtimeContext
-                            .getLocalResourceRepository();
-            localResourceRepository.initializeNewUniverse(ClusterProperties.INSTANCE.getStorageDirectoryName());
-        }
-
-        isMetadataNode = nodeId.equals(metadataProperties.getMetadataNodeName());
-        if (isMetadataNode && !pendingFailbackCompletion) {
-            runtimeContext.initializeMetadata(systemState == SystemState.NEW_UNIVERSE);
+        // Since we don't pass initial run flag in AsterixHyracksIntegrationUtil, we use the virtualNC flag
+        if (systemState == SystemState.NEW_UNIVERSE && (initialRun || virtualNC)) {
+            systemState = SystemState.INITIAL_RUN;
         }
-        ExternalLibraryUtils.setUpExternaLibraries(runtimeContext.getLibraryManager(),
-                isMetadataNode && !pendingFailbackCompletion);
-
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Starting lifecycle components");
-        }
-
-        Map<String, String> lifecycleMgmtConfiguration = new HashMap<String, String>();
-        String dumpPathKey = LifeCycleComponentManager.Config.DUMP_PATH_KEY;
-        String dumpPath = metadataProperties.getCoredumpPath(nodeId);
-        lifecycleMgmtConfiguration.put(dumpPathKey, dumpPath);
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Coredump directory for NC is: " + dumpPath);
-        }
-        ILifeCycleComponentManager lccm = ncApplicationContext.getLifeCycleComponentManager();
-        lccm.configure(lifecycleMgmtConfiguration);
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Configured:" + lccm);
-        }
-        ncApplicationContext.setStateDumpHandler(
-                new AsterixStateDumpHandler(ncApplicationContext.getNodeId(), lccm.getDumpPath(), lccm));
-
-        lccm.startAll();
-
-        if (!pendingFailbackCompletion) {
-            ICheckpointManager checkpointMgr = runtimeContext.getTransactionSubsystem().getCheckpointManager();
-            checkpointMgr.doSharpCheckpoint();
-
-            if (isMetadataNode) {
-                runtimeContext.exportMetadataNodeStub();
-            }
-        }
-
-        //Clean any temporary files
-        performLocalCleanUp();
+        // Request startup tasks from CC
+        StartupTaskRequestMessage.send((NodeControllerService) ncApplicationContext.getControllerService(),
+                systemState);
     }
 
     @Override
@@ -296,8 +201,7 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
     }
 
     private void updateOnNodeJoin() {
-        MetadataProperties metadataProperties = ((IPropertiesProvider) runtimeContext)
-                .getMetadataProperties();
+        MetadataProperties metadataProperties = ((IPropertiesProvider) runtimeContext).getMetadataProperties();
         if (!metadataProperties.getNodeNames().contains(nodeId)) {
             metadataProperties.getNodeNames().add(nodeId);
             Cluster cluster = ClusterProperties.INSTANCE.getCluster();
@@ -305,8 +209,7 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
                 throw new IllegalStateException("No cluster configuration found for this instance");
             }
             String asterixInstanceName = metadataProperties.getInstanceName();
-            TransactionProperties txnProperties = ((IPropertiesProvider) runtimeContext)
-                    .getTransactionProperties();
+            TransactionProperties txnProperties = ((IPropertiesProvider) runtimeContext).getTransactionProperties();
             Node self = null;
             List<Node> nodes;
             if (cluster.getSubstituteNodes() != null) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java
new file mode 100644
index 0000000..0a9a215
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.util;
+
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.common.replication.Replica;
+import org.apache.asterix.runtime.message.ReplicaEventMessage;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
+
+public class FaultToleranceUtil {
+
+    private static final Logger LOGGER = Logger.getLogger(FaultToleranceUtil.class.getName());
+    private static final String CLUSTER_NET_IP_ADDRESS_KEY = "cluster-net-ip-address";
+
+    private FaultToleranceUtil() {
+        throw new AssertionError();
+    }
+
+    public static void notifyImpactedReplicas(String nodeId, ClusterEventType event,
+            IClusterStateManager clusterManager, ICCMessageBroker messageBroker,
+            IReplicationStrategy replicationStrategy) {
+        List<String> primaryRemoteReplicas = replicationStrategy.getRemotePrimaryReplicas(nodeId).stream()
+                .map(Replica::getId).collect(Collectors.toList());
+        String nodeIdAddress = StringUtils.EMPTY;
+        Map<String, Map<String, String>> activeNcConfiguration = clusterManager.getActiveNcConfiguration();
+        // In case the node joined with a new IP address, we need to send it to the other replicas
+        if (event == ClusterEventType.NODE_JOIN) {
+            nodeIdAddress = activeNcConfiguration.get(nodeId).get(CLUSTER_NET_IP_ADDRESS_KEY);
+        }
+        ReplicaEventMessage msg = new ReplicaEventMessage(nodeId, nodeIdAddress, event);
+        for (String replica : primaryRemoteReplicas) {
+            // If the remote replica is alive, send the event
+            if (activeNcConfiguration.containsKey(replica)) {
+                try {
+                    messageBroker.sendApplicationMessageToNC(msg, replica);
+                } catch (Exception e) {
+                    LOGGER.log(Level.WARNING, "Failed sending an application message to an NC", e);
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/resources/cluster.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/resources/cluster.xml b/asterixdb/asterix-app/src/main/resources/cluster.xml
index 8f0b694..7f78b26 100644
--- a/asterixdb/asterix-app/src/main/resources/cluster.xml
+++ b/asterixdb/asterix-app/src/main/resources/cluster.xml
@@ -19,14 +19,22 @@
 <cluster xmlns="cluster">
   <instance_name>asterix</instance_name>
   <store>storage</store>
+  <metadata_node>nc1</metadata_node>
 
-  <data_replication>
+  <high_availability>
     <enabled>false</enabled>
-    <replication_port>2016</replication_port>
-    <replication_factor>2</replication_factor>
-    <auto_failover>false</auto_failover>
-    <replication_time_out>30</replication_time_out>
-  </data_replication>
+    <data_replication>
+      <strategy>metadata_only</strategy>
+      <replication_port>2016</replication_port>
+      <replication_time_out>30</replication_time_out>
+    </data_replication>
+    <fault_tolerance>
+       <strategy>metadata_node</strategy>
+       <replica>
+         <node_id>nc2</node_id>
+       </replica>
+    </fault_tolerance>
+  </high_availability>
 
   <master_node>
     <id>master</id>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/test/resources/runtimets/results/api/replication/replication.1.adm
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/replication/replication.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/replication/replication.1.adm
index 74e019b..a614faa 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/replication/replication.1.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/replication/replication.1.adm
@@ -1,11 +1,9 @@
 {
   "config" : {
-    "enabled" : false,
-    "factor" : 2,
     "log.batchsize" : 4096,
     "log.buffer.numpages" : 8,
     "log.buffer.pagesize" : 131072,
     "max.remote.recovery.attempts" : 5,
     "timeout" : 30
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index 04e30ff..ae19f23 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -23,6 +23,7 @@ import java.util.List;
 import org.apache.asterix.common.context.DatasetInfo;
 import org.apache.asterix.common.context.IndexInfo;
 import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
+import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.api.IIndex;
 import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
@@ -102,4 +103,12 @@ public interface IDatasetLifecycleManager extends IResourceLifecycleManager<IInd
      * @throws HyracksDataException
      */
     void closeUserDatasets() throws HyracksDataException;
+
+    /**
+     * Flushes all opened datasets that are matching {@code replicationStrategy}.
+     *
+     * @param replicationStrategy
+     * @throws HyracksDataException
+     */
+    void flushDataset(IReplicationStrategy replicationStrategy) throws HyracksDataException;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INCLifecycleTask.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INCLifecycleTask.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INCLifecycleTask.java
new file mode 100644
index 0000000..c30e999
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INCLifecycleTask.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.api;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+
+@FunctionalInterface
+public interface INCLifecycleTask extends Serializable {
+
+    /**
+     * Performs the task.
+     *
+     * @param cs
+     * @throws HyracksDataException
+     */
+    void perform(IControllerService cs) throws HyracksDataException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
new file mode 100644
index 0000000..d971f48
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.cluster;
+
+import java.util.Map;
+
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IClusterStateManager {
+
+    /**
+     * @return The current cluster state.
+     */
+    ClusterState getState();
+
+    /**
+     * Updates the cluster state based on the state of all cluster partitions and the metadata node.
+     * Cluster state after refresh:
+     * ACTIVE: all cluster partitions are active and the metadata node is bound.
+     * PENDING: all cluster partitions are active but the metadata node is not bound.
+     * UNUSABLE: one or more cluster partitions are not active.
+     */
+    void refreshState() throws HyracksDataException;
+
+    /**
+     * Sets the cluster state into {@code state}
+     */
+    void setState(ClusterState state);
+
+    /**
+     * Updates all partitions of {@code nodeId} based on the {@code active} flag.
+     * @param nodeId
+     * @param active
+     * @throws HyracksDataException
+     */
+    void updateNodePartitions(String nodeId, boolean active) throws HyracksDataException;
+
+    /**
+     * Updates the active node and active state of the cluster partition with id {@code partitionNum}
+     */
+    void updateClusterPartition(Integer partitionNum, String activeNode, boolean active);
+
+    /**
+     * Updates the metadata node id and its state.
+     */
+    void updateMetadataNode(String nodeId, boolean active);
+
+    /**
+     * @return a map of nodeId and NC Configuration for active nodes.
+     */
+    Map<String, Map<String, String>> getActiveNcConfiguration();
+
+    /**
+     * @return The current metadata node Id.
+     */
+    String getCurrentMetadataNodeId();
+
+    /**
+     * @param nodeId
+     * @return The node originally assigned partitions.
+     */
+    ClusterPartition[] getNodePartitions(String nodeId);
+
+    /**
+     * @return A copy of the current state of the cluster partitions.
+     */
+    ClusterPartition[] getClusterPartitons();
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java
index 81c5a6d..980ad24 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java
@@ -19,21 +19,27 @@
 package org.apache.asterix.common.config;
 
 import java.io.InputStream;
+import java.util.Optional;
+import java.util.stream.Collectors;
 
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.JAXBException;
 import javax.xml.bind.Unmarshaller;
 
+import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.common.replication.ReplicationStrategyFactory;
 import org.apache.asterix.event.schema.cluster.Cluster;
+import org.apache.asterix.event.schema.cluster.Node;
+import org.apache.asterix.event.schema.cluster.Replica;
+import org.apache.commons.lang3.StringUtils;
 
 public class ClusterProperties {
 
     public static final ClusterProperties INSTANCE = new ClusterProperties();
-
     private static final String CLUSTER_CONFIGURATION_FILE = "cluster.xml";
     private static final String DEFAULT_STORAGE_DIR_NAME = "storage";
-
-    private final Cluster cluster;
+    private String nodeNamePrefix = StringUtils.EMPTY;
+    private Cluster cluster;
 
     private ClusterProperties() {
         InputStream is = this.getClass().getClassLoader().getResourceAsStream(CLUSTER_CONFIGURATION_FILE);
@@ -42,11 +48,11 @@ public class ClusterProperties {
                 JAXBContext ctx = JAXBContext.newInstance(Cluster.class);
                 Unmarshaller unmarshaller = ctx.createUnmarshaller();
                 cluster = (Cluster) unmarshaller.unmarshal(is);
+                nodeNamePrefix = cluster.getInstanceName() + "_";
+                updateNodeIdToFullName();
             } catch (JAXBException e) {
                 throw new IllegalStateException("Failed to read configuration file " + CLUSTER_CONFIGURATION_FILE, e);
             }
-        } else {
-            cluster = null;
         }
     }
 
@@ -62,14 +68,41 @@ public class ClusterProperties {
         return DEFAULT_STORAGE_DIR_NAME;
     }
 
-    public boolean isReplicationEnabled() {
-        if (cluster != null && cluster.getDataReplication() != null) {
-            return cluster.getDataReplication().isEnabled();
+    public Node getNodeById(String nodeId) {
+        Optional<Node> matchingNode = cluster.getNode().stream().filter(node -> node.getId().equals(nodeId)).findAny();
+        return matchingNode.isPresent() ? matchingNode.get() : null;
+    }
+
+    public int getNodeIndex(String nodeId) {
+        for (int i = 0; i < cluster.getNode().size(); i++) {
+            Node node = cluster.getNode().get(i);
+            if (node.getId().equals(nodeId)) {
+                return i;
+            }
+        }
+        return -1;
+    }
+
+    public IReplicationStrategy getReplicationStrategy() {
+        return ReplicationStrategyFactory.create(cluster);
+    }
+
+    private String getNodeFullName(String nodeId) {
+        if (nodeId.startsWith(nodeNamePrefix)) {
+            return nodeId;
         }
-        return false;
+        return nodeNamePrefix + nodeId;
     }
 
-    public boolean isAutoFailoverEnabled() {
-        return isReplicationEnabled() && cluster.getDataReplication().isAutoFailover();
+    private void updateNodeIdToFullName() {
+        cluster.getNode().forEach(node -> node.setId(getNodeFullName(node.getId())));
+        if (cluster.getMetadataNode() != null) {
+            cluster.setMetadataNode(getNodeFullName(cluster.getMetadataNode()));
+        }
+        if (cluster.getHighAvailability() != null && cluster.getHighAvailability().getFaultTolerance() != null
+                && cluster.getHighAvailability().getFaultTolerance().getReplica() != null) {
+            Replica replicas = cluster.getHighAvailability().getFaultTolerance().getReplica();
+            replicas.setNodeId(replicas.getNodeId().stream().map(this::getNodeFullName).collect(Collectors.toList()));
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ReplicationProperties.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ReplicationProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ReplicationProperties.java
index 164a525..cf2ce4f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ReplicationProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ReplicationProperties.java
@@ -18,11 +18,10 @@
  */
 package org.apache.asterix.common.config;
 
-import java.util.HashSet;
 import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
+import java.util.stream.Collectors;
 
+import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.replication.Replica;
 import org.apache.asterix.event.schema.cluster.Cluster;
 import org.apache.asterix.event.schema.cluster.Node;
@@ -31,16 +30,8 @@ import org.apache.hyracks.util.StorageUtil.StorageUnit;
 
 public class ReplicationProperties extends AbstractProperties {
 
-    private static final Logger LOGGER = Logger.getLogger(ReplicationProperties.class.getName());
-
-
     private static final int REPLICATION_DATAPORT_DEFAULT = 2000;
 
-    private static final String REPLICATION_ENABLED_KEY = "replication.enabled";
-
-    private static final String REPLICATION_FACTOR_KEY = "replication.factor";
-    private static final int REPLICATION_FACTOR_DEFAULT = 1;
-
     private static final String REPLICATION_TIMEOUT_KEY = "replication.timeout";
     private static final int REPLICATION_TIME_OUT_DEFAULT = 15;
 
@@ -60,205 +51,60 @@ public class ReplicationProperties extends AbstractProperties {
     private static final int REPLICATION_LOG_BUFFER_PAGE_SIZE_DEFAULT = StorageUtil.getSizeInBytes(128,
             StorageUnit.KILOBYTE);
 
-    private final String nodeNamePrefix;
     private final Cluster cluster;
+    private final IReplicationStrategy repStrategy;
 
     public ReplicationProperties(PropertiesAccessor accessor) {
         super(accessor);
         this.cluster = ClusterProperties.INSTANCE.getCluster();
-
-        if (cluster != null) {
-            nodeNamePrefix = cluster.getInstanceName() + "_";
-        } else {
-            nodeNamePrefix = "";
-        }
-    }
-
-    @PropertyKey(REPLICATION_ENABLED_KEY)
-    public boolean isReplicationEnabled() {
-        return ClusterProperties.INSTANCE.isReplicationEnabled();
+        this.repStrategy = ClusterProperties.INSTANCE.getReplicationStrategy();
     }
 
     public String getReplicaIPAddress(String nodeId) {
-        if (cluster != null) {
-            for (int i = 0; i < cluster.getNode().size(); i++) {
-                Node node = cluster.getNode().get(i);
-                if (getRealCluserNodeID(node.getId()).equals(nodeId)) {
-                    return node.getClusterIp();
-                }
-            }
-        }
-        return NODE_IP_ADDRESS_DEFAULT;
+        Node node = ClusterProperties.INSTANCE.getNodeById(nodeId);
+        return node != null ? node.getClusterIp() : NODE_IP_ADDRESS_DEFAULT;
     }
 
     public int getDataReplicationPort(String nodeId) {
-        if (cluster != null && cluster.getDataReplication() != null) {
-            for (int i = 0; i < cluster.getNode().size(); i++) {
-                Node node = cluster.getNode().get(i);
-                if (getRealCluserNodeID(node.getId()).equals(nodeId)) {
-                    return node.getReplicationPort() != null ? node.getReplicationPort().intValue()
-                            : cluster.getDataReplication().getReplicationPort().intValue();
-                }
-            }
+        Node node = ClusterProperties.INSTANCE.getNodeById(nodeId);
+        if (node != null) {
+            return node.getReplicationPort() != null ? node.getReplicationPort().intValue()
+                    : cluster.getHighAvailability().getDataReplication().getReplicationPort().intValue();
         }
         return REPLICATION_DATAPORT_DEFAULT;
     }
 
-    public Set<Replica> getRemoteReplicas(String nodeId) {
-        Set<Replica> remoteReplicas = new HashSet<>();;
-
-        int numberOfRemoteReplicas = getReplicationFactor() - 1;
-        //Using chained-declustering
-        if (cluster != null) {
-            int nodeIndex = -1;
-            //find the node index in the cluster config
-            for (int i = 0; i < cluster.getNode().size(); i++) {
-                Node node = cluster.getNode().get(i);
-                if (getRealCluserNodeID(node.getId()).equals(nodeId)) {
-                    nodeIndex = i;
-                    break;
-                }
-            }
-
-            if (nodeIndex == -1) {
-                LOGGER.log(Level.WARNING,
-                        "Could not find node " + getRealCluserNodeID(nodeId) + " in cluster configurations");
-                return null;
-            }
-
-            //find nodes to the right of this node
-            for (int i = nodeIndex + 1; i < cluster.getNode().size(); i++) {
-                remoteReplicas.add(getReplicaByNodeIndex(i));
-                if (remoteReplicas.size() == numberOfRemoteReplicas) {
-                    break;
-                }
-            }
-
-            //if not all remote replicas have been found, start from the beginning
-            if (remoteReplicas.size() != numberOfRemoteReplicas) {
-                for (int i = 0; i < cluster.getNode().size(); i++) {
-                    remoteReplicas.add(getReplicaByNodeIndex(i));
-                    if (remoteReplicas.size() == numberOfRemoteReplicas) {
-                        break;
-                    }
-                }
-            }
-        }
-        return remoteReplicas;
-    }
-
-    private Replica getReplicaByNodeIndex(int nodeIndex) {
-        Node node = cluster.getNode().get(nodeIndex);
-        Node replicaNode = new Node();
-        replicaNode.setId(getRealCluserNodeID(node.getId()));
-        replicaNode.setClusterIp(node.getClusterIp());
-        return new Replica(replicaNode);
-    }
-
     public Replica getReplicaById(String nodeId) {
-        int nodeIndex = -1;
-        if (cluster != null) {
-            for (int i = 0; i < cluster.getNode().size(); i++) {
-                Node node = cluster.getNode().get(i);
-
-                if (getRealCluserNodeID(node.getId()).equals(nodeId)) {
-                    nodeIndex = i;
-                    break;
-                }
-            }
+        Node node = ClusterProperties.INSTANCE.getNodeById(nodeId);
+        if (node != null) {
+            return new Replica(node);
         }
-
-        if (nodeIndex < 0) {
-            return null;
-        }
-
-        return getReplicaByNodeIndex(nodeIndex);
+        return null;
     }
 
     public Set<String> getRemoteReplicasIds(String nodeId) {
-        Set<String> remoteReplicasIds = new HashSet<>();
-        Set<Replica> remoteReplicas = getRemoteReplicas(nodeId);
-
-        for (Replica replica : remoteReplicas) {
-            remoteReplicasIds.add(replica.getId());
-        }
-
-        return remoteReplicasIds;
+        return repStrategy.getRemoteReplicas(nodeId).stream().map(Replica::getId).collect(Collectors.toSet());
     }
 
-    public String getRealCluserNodeID(String nodeId) {
-        return nodeNamePrefix + nodeId;
+    public Set<String> getRemotePrimaryReplicasIds(String nodeId) {
+        return repStrategy.getRemotePrimaryReplicas(nodeId).stream().map(Replica::getId).collect(Collectors.toSet());
     }
 
     public Set<String> getNodeReplicasIds(String nodeId) {
-        Set<String> replicaIds = new HashSet<>();
-        replicaIds.add(nodeId);
-        replicaIds.addAll(getRemoteReplicasIds(nodeId));
-        return replicaIds;
-    }
-
-    @PropertyKey(REPLICATION_FACTOR_KEY)
-    public int getReplicationFactor() {
-        if (cluster != null) {
-            if (cluster.getDataReplication() == null || cluster.getDataReplication().getReplicationFactor() == null) {
-                return REPLICATION_FACTOR_DEFAULT;
-            }
-            return cluster.getDataReplication().getReplicationFactor().intValue();
-        }
-        return REPLICATION_FACTOR_DEFAULT;
+        Set<String> remoteReplicasIds = getRemoteReplicasIds(nodeId);
+        // This includes the node itself
+        remoteReplicasIds.add(nodeId);
+        return remoteReplicasIds;
     }
 
     @PropertyKey(REPLICATION_TIMEOUT_KEY)
     public int getReplicationTimeOut() {
         if (cluster != null) {
-            return cluster.getDataReplication().getReplicationTimeOut().intValue();
+            return cluster.getHighAvailability().getDataReplication().getReplicationTimeOut().intValue();
         }
         return REPLICATION_TIME_OUT_DEFAULT;
     }
 
-    /**
-     * @param nodeId
-     * @return The set of nodes which replicate to this node, including the node itself
-     */
-    public Set<String> getNodeReplicationClients(String nodeId) {
-        Set<String> clientReplicas = new HashSet<>();
-        clientReplicas.add(nodeId);
-
-        int clientsCount = getReplicationFactor();
-
-        //Using chained-declustering backwards
-        if (cluster != null) {
-            int nodeIndex = -1;
-            //find the node index in the cluster config
-            for (int i = 0; i < cluster.getNode().size(); i++) {
-                Node node = cluster.getNode().get(i);
-                if (getRealCluserNodeID(node.getId()).equals(nodeId)) {
-                    nodeIndex = i;
-                    break;
-                }
-            }
-
-            //find nodes to the left of this node
-            for (int i = nodeIndex - 1; i >= 0; i--) {
-                clientReplicas.add(getReplicaByNodeIndex(i).getId());
-                if (clientReplicas.size() == clientsCount) {
-                    break;
-                }
-            }
-
-            //if not all client replicas have been found, start from the end
-            if (clientReplicas.size() != clientsCount) {
-                for (int i = cluster.getNode().size() - 1; i >= 0; i--) {
-                    clientReplicas.add(getReplicaByNodeIndex(i).getId());
-                    if (clientReplicas.size() == clientsCount) {
-                        break;
-                    }
-                }
-            }
-        }
-        return clientReplicas;
-    }
-
     @PropertyKey(REPLICATION_MAX_REMOTE_RECOVERY_ATTEMPTS_KEY)
     public int getMaxRemoteRecoveryAttempts() {
         return MAX_REMOTE_RECOVERY_ATTEMPTS;
@@ -281,4 +127,12 @@ public class ReplicationProperties extends AbstractProperties {
         return accessor.getProperty(REPLICATION_LOG_BATCH_SIZE_KEY, REPLICATION_LOG_BATCH_SIZE_DEFAULT,
                 PropertyInterpreters.getIntegerBytePropertyInterpreter());
     }
-}
+
+    public boolean isParticipant(String nodeId) {
+        return repStrategy.isParticipant(nodeId);
+    }
+
+    public IReplicationStrategy getReplicationStrategy() {
+        return repStrategy;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index f49b07a..cbb4868 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -30,6 +30,7 @@ import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.config.StorageProperties;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
+import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.Resource;
@@ -571,4 +572,13 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
     public int getNumPartitions() {
         return numPartitions;
     }
+
+    @Override
+    public void flushDataset(IReplicationStrategy replicationStrategy) throws HyracksDataException {
+        for (DatasetResource dsr : datasets.values()) {
+            if (replicationStrategy.isMatch(dsr.getDatasetID())) {
+                flushDatasetOpenIndexes(dsr.getDatasetInfo(), false);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ChainedDeclusteringReplicationStrategy.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ChainedDeclusteringReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ChainedDeclusteringReplicationStrategy.java
new file mode 100644
index 0000000..ad326b2
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ChainedDeclusteringReplicationStrategy.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.config.ClusterProperties;
+import org.apache.asterix.event.schema.cluster.Cluster;
+
+public class ChainedDeclusteringReplicationStrategy implements IReplicationStrategy {
+
+    private static final Logger LOGGER = Logger.getLogger(ChainedDeclusteringReplicationStrategy.class.getName());
+    private int replicationFactor;
+
+    @Override
+    public boolean isMatch(int datasetId) {
+        return true;
+    }
+
+    @Override
+    public Set<Replica> getRemoteReplicas(String nodeId) {
+        Set<Replica> remoteReplicas = new HashSet<>();
+        Cluster cluster = ClusterProperties.INSTANCE.getCluster();
+        int numberOfRemoteReplicas = replicationFactor - 1;
+        int nodeIndex = ClusterProperties.INSTANCE.getNodeIndex(nodeId);
+
+        if (nodeIndex == -1) {
+            LOGGER.log(Level.WARNING, "Could not find node " + nodeId + " in cluster configurations");
+            return Collections.emptySet();
+        }
+
+        //find nodes to the right of this node
+        while (remoteReplicas.size() != numberOfRemoteReplicas) {
+            remoteReplicas.add(new Replica(cluster.getNode().get(++nodeIndex % cluster.getNode().size())));
+        }
+
+        return remoteReplicas;
+    }
+
+    @Override
+    public Set<Replica> getRemotePrimaryReplicas(String nodeId) {
+        Set<Replica> clientReplicas = new HashSet<>();
+        Cluster cluster = ClusterProperties.INSTANCE.getCluster();
+        final int remotePrimaryReplicasCount = replicationFactor - 1;
+
+        int nodeIndex = ClusterProperties.INSTANCE.getNodeIndex(nodeId);
+
+        //find nodes to the left of this node
+        while (clientReplicas.size() != remotePrimaryReplicasCount) {
+            clientReplicas.add(new Replica(cluster.getNode().get(Math.abs(--nodeIndex % cluster.getNode().size()))));
+        }
+
+        return clientReplicas;
+    }
+
+    @Override
+    public ChainedDeclusteringReplicationStrategy from(Cluster cluster) {
+        if (cluster.getHighAvailability().getDataReplication().getReplicationFactor() == null) {
+            throw new IllegalStateException("Replication factor must be specified.");
+        }
+        ChainedDeclusteringReplicationStrategy cd = new ChainedDeclusteringReplicationStrategy();
+        cd.replicationFactor = cluster.getHighAvailability().getDataReplication().getReplicationFactor().intValue();
+        return cd;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IFaultToleranceStrategy.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IFaultToleranceStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IFaultToleranceStrategy.java
new file mode 100644
index 0000000..46d5d98
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IFaultToleranceStrategy.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IFaultToleranceStrategy {
+
+    /**
+     * Defines the logic of a {@link IFaultToleranceStrategy} when a node joins the cluster.
+     *
+     * @param nodeId
+     * @throws HyracksDataException
+     */
+    void notifyNodeJoin(String nodeId) throws HyracksDataException;
+
+    /**
+     * Defines the logic of a {@link IFaultToleranceStrategy} when a node leaves the cluster.
+     *
+     * @param nodeId
+     * @throws HyracksDataException
+     */
+    void notifyNodeFailure(String nodeId) throws HyracksDataException;
+
+    /**
+     * Binds the fault tolerance strategy to {@code cluserManager}.
+     *
+     * @param clusterManager
+     */
+    void bindTo(IClusterStateManager clusterManager);
+
+    /**
+     * Processes {@code message} based on the message type.
+     *
+     * @param message
+     * @throws HyracksDataException
+     */
+    void process(INCLifecycleMessage message) throws HyracksDataException;
+
+    /**
+     * Constructs a fault tolerance strategy.
+     *
+     * @param replicationStrategy
+     * @param messageBroker
+     * @return
+     */
+    IFaultToleranceStrategy from(IReplicationStrategy replicationStrategy, ICCMessageBroker messageBroker);
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java
new file mode 100644
index 0000000..c19b0aa
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
+
+public interface INCLifecycleMessage extends IApplicationMessage {
+
+    public enum MessageType {
+        REPLAY_LOGS_REQUEST,
+        REPLAY_LOGS_RESPONSE,
+        PREPARE_FAILBACK_REQUEST,
+        PREPARE_FAILBACK_RESPONSE,
+        COMPLETE_FAILBACK_REQUEST,
+        COMPLETE_FAILBACK_RESPONSE,
+        STARTUP_TASK_REQUEST,
+        STARTUP_TASK_RESPONSE,
+        STARTUP_TASK_RESULT,
+        TAKEOVER_PARTITION_REQUEST,
+        TAKEOVER_PARTITION_RESPONSE,
+        TAKEOVER_METADATA_NODE_REQUEST,
+        TAKEOVER_METADATA_NODE_RESPONSE
+    }
+
+    /**
+     * @return The message type.
+     */
+    MessageType getType();
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
index 9f9d74b..51b826b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
@@ -19,8 +19,11 @@
 package org.apache.asterix.common.replication;
 
 import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface IRemoteRecoveryManager {
 
@@ -46,4 +49,23 @@ public interface IRemoteRecoveryManager {
      * @throws InterruptedException
      */
     public void completeFailbackProcess() throws IOException, InterruptedException;
+
+    /**
+     * Replays all committed jobs logs for {@code partitions}. Optionally, flushes all datasets
+     * to convert the replayed logs into LSM Components.
+     *
+     * @param partitions
+     * @param flush
+     * @throws HyracksDataException
+     */
+    void replayReplicaPartitionLogs(Set<Integer> partitions, boolean flush) throws HyracksDataException;
+
+    /**
+     * Performs the remote recovery plan by requesting data from each specified node
+     * for each partitions specified.
+     *
+     * @param recoveryPlan
+     * @throws HyracksDataException
+     */
+    void doRemoteRecoveryPlan(Map<String, Set<Integer>> recoveryPlan) throws HyracksDataException;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
index 6bd1505..b969bef 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
@@ -50,13 +50,13 @@ public interface IReplicationManager extends IIOReplicationManager {
      *
      * @param remoteReplicaId
      *            The replica id to send the request to.
-     * @param replicasDataToRecover
-     *            Get files that belong to those replicas.
+     * @param partitionsToRecover
+     *            Get files that belong to those partitions.
      * @param existingFiles
      *            a list of already existing files on the requester
      * @throws IOException
      */
-    public void requestReplicaFiles(String remoteReplicaId, Set<String> replicasDataToRecover,
+    public void requestReplicaFiles(String remoteReplicaId, Set<Integer> partitionsToRecover,
             Set<String> existingFiles) throws IOException;
 
     /**

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
new file mode 100644
index 0000000..f65f6ac
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.util.Set;
+
+import org.apache.asterix.event.schema.cluster.Cluster;
+
+public interface IReplicationStrategy {
+
+    /**
+     * @param datasetId
+     * @return True, if the dataset should be replicated. Otherwise false.
+     */
+    boolean isMatch(int datasetId);
+
+    /**
+     * @param nodeId
+     * @return The set of nodes that replicate data on {@code nodeId}.
+     */
+    Set<Replica> getRemotePrimaryReplicas(String nodeId);
+
+    /**
+     * @param node
+     * @return The set of nodes that {@code nodeId} replicates data to.
+     */
+    Set<Replica> getRemoteReplicas(String node);
+
+    /**
+     * @param nodeId
+     * @return true if {@code nodeId} has any remote primary replica or remote replica. Otherwise false.
+     */
+    default boolean isParticipant(String nodeId) {
+        return !getRemoteReplicas(nodeId).isEmpty() || !getRemotePrimaryReplicas(nodeId).isEmpty();
+    }
+
+    /**
+     * @param cluster
+     * @return A replication strategy based on the passed configurations.
+     */
+    IReplicationStrategy from(Cluster cluster);
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
new file mode 100644
index 0000000..711f06d
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.asterix.common.config.ClusterProperties;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
+import org.apache.asterix.event.schema.cluster.Cluster;
+import org.apache.asterix.event.schema.cluster.Node;
+
+public class MetadataOnlyReplicationStrategy implements IReplicationStrategy {
+
+    private String metadataNodeId;
+    private Replica metadataPrimaryReplica;
+    private Set<Replica> metadataNodeReplicas;
+
+    @Override
+    public boolean isMatch(int datasetId) {
+        return datasetId < MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID && datasetId >= 0;
+    }
+
+    @Override
+    public Set<Replica> getRemoteReplicas(String nodeId) {
+        if (nodeId.equals(metadataNodeId)) {
+            return metadataNodeReplicas;
+        }
+        return Collections.emptySet();
+    }
+
+    @Override
+    public Set<Replica> getRemotePrimaryReplicas(String nodeId) {
+        if (metadataNodeReplicas.stream().map(Replica::getId).filter(replicaId -> replicaId.equals(nodeId))
+                .count() != 0) {
+            return new HashSet<>(Arrays.asList(metadataPrimaryReplica));
+        }
+        return Collections.emptySet();
+    }
+
+    @Override
+    public MetadataOnlyReplicationStrategy from(Cluster cluster) {
+        if (cluster.getMetadataNode() == null) {
+            throw new IllegalStateException("Metadata node must be specified.");
+        }
+
+        Node metadataNode = ClusterProperties.INSTANCE.getNodeById(cluster.getMetadataNode());
+        if (metadataNode == null) {
+            throw new IllegalStateException("Invalid metadata node specified");
+        }
+
+        if (cluster.getHighAvailability().getFaultTolerance().getReplica() == null
+                || cluster.getHighAvailability().getFaultTolerance().getReplica().getNodeId() == null
+                || cluster.getHighAvailability().getFaultTolerance().getReplica().getNodeId().isEmpty()) {
+            throw new IllegalStateException("One or more replicas must be specified for metadata node.");
+        }
+
+        final Set<Replica> replicas = new HashSet<>();
+        for (String nodeId : cluster.getHighAvailability().getFaultTolerance().getReplica().getNodeId()) {
+            Node node = ClusterProperties.INSTANCE.getNodeById(nodeId);
+            if (node == null) {
+                throw new IllegalStateException("Invalid replica specified: " + nodeId);
+            }
+            replicas.add(new Replica(node));
+        }
+        MetadataOnlyReplicationStrategy st = new MetadataOnlyReplicationStrategy();
+        st.metadataNodeId = cluster.getMetadataNode();
+        st.metadataPrimaryReplica = new Replica(metadataNode);
+        st.metadataNodeReplicas = replicas;
+        return st;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
new file mode 100644
index 0000000..43347f6
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.asterix.event.schema.cluster.Cluster;
+
+public class NoReplicationStrategy implements IReplicationStrategy {
+
+    @Override
+    public boolean isMatch(int datasetId) {
+        return false;
+    }
+
+    @Override
+    public boolean isParticipant(String nodeId) {
+        return false;
+    }
+
+    @Override
+    public Set<Replica> getRemotePrimaryReplicas(String nodeId) {
+        return Collections.emptySet();
+    }
+
+    @Override
+    public Set<Replica> getRemoteReplicas(String node) {
+        return Collections.emptySet();
+    }
+
+    @Override
+    public NoReplicationStrategy from(Cluster cluster) {
+        return new NoReplicationStrategy();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
index bd77778..267a22d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
@@ -36,11 +36,13 @@ public class Replica {
         UNKNOWN
     }
 
-    final Node node;
+    private final Node node;
     private ReplicaState state = ReplicaState.UNKNOWN;
 
     public Replica(Node node) {
-        this.node = node;
+        this.node = new Node();
+        this.node.setId(node.getId());
+        this.node.setClusterIp(node.getClusterIp());
     }
 
     public ReplicaState getState() {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicationStrategyFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicationStrategyFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicationStrategyFactory.java
new file mode 100644
index 0000000..b61b38a
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicationStrategyFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.asterix.event.schema.cluster.Cluster;
+
+public class ReplicationStrategyFactory {
+
+    private static final Map<String, Class<? extends IReplicationStrategy>>
+    BUILT_IN_REPLICATION_STRATEGY = new HashMap<>();
+
+    static {
+        BUILT_IN_REPLICATION_STRATEGY.put("no_replication", NoReplicationStrategy.class);
+        BUILT_IN_REPLICATION_STRATEGY.put("chained_declustering", ChainedDeclusteringReplicationStrategy.class);
+        BUILT_IN_REPLICATION_STRATEGY.put("metadata_only", MetadataOnlyReplicationStrategy.class);
+    }
+
+    private ReplicationStrategyFactory() {
+        throw new AssertionError();
+    }
+
+    public static IReplicationStrategy create(Cluster cluster) {
+        boolean highAvailabilityEnabled = cluster.getHighAvailability() != null
+                && cluster.getHighAvailability().getEnabled() != null
+                && Boolean.valueOf(cluster.getHighAvailability().getEnabled());
+
+        if (!highAvailabilityEnabled || cluster.getHighAvailability().getDataReplication() == null
+                || cluster.getHighAvailability().getDataReplication().getStrategy() == null) {
+            return new NoReplicationStrategy();
+        }
+        String strategyName = cluster.getHighAvailability().getDataReplication().getStrategy().toLowerCase();
+        if (!BUILT_IN_REPLICATION_STRATEGY.containsKey(strategyName)) {
+            throw new IllegalArgumentException(String.format("Unsupported Replication Strategy. Available types: %s",
+                    BUILT_IN_REPLICATION_STRATEGY.keySet().toString()));
+        }
+        Class<? extends IReplicationStrategy> clazz = BUILT_IN_REPLICATION_STRATEGY.get(strategyName);
+        try {
+            return clazz.newInstance().from(cluster);
+        } catch (InstantiationException | IllegalAccessException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexFileProperties.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexFileProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexFileProperties.java
new file mode 100644
index 0000000..ca6968f
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexFileProperties.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.storage;
+
+import java.io.File;
+
+import org.apache.asterix.common.utils.StoragePathUtil;
+
+/**
+ * A holder class for an index file properties.
+ */
+public class IndexFileProperties {
+
+    private final String fileName;
+    private final String idxName;
+    private final String dataverseName;
+    private final int partitionId;
+    private final int datasetId;
+
+    public IndexFileProperties(int partitionId, String dataverseName, String idxName, String fileName, int datasetId) {
+        this.partitionId = partitionId;
+        this.dataverseName = dataverseName;
+        this.idxName = idxName;
+        this.fileName = fileName;
+        this.datasetId = datasetId;
+    }
+
+    public String getFileName() {
+        return fileName;
+    }
+
+    public String getIdxName() {
+        return idxName;
+    }
+
+    public String getDataverseName() {
+        return dataverseName;
+    }
+
+    public int getPartitionId() {
+        return partitionId;
+    }
+
+    public int getDatasetId() {
+        return datasetId;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append(StoragePathUtil.PARTITION_DIR_PREFIX + partitionId + File.separator);
+        sb.append(dataverseName + File.separator);
+        sb.append(idxName + File.separator);
+        sb.append(fileName);
+        sb.append(" [Dataset ID: " + datasetId + "]");
+        return sb.toString();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
index 6816116..3e85276 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
@@ -34,6 +34,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 public interface IRecoveryManager {
 
     public enum SystemState {
+        INITIAL_RUN,
         NEW_UNIVERSE,
         RECOVERING,
         HEALTHY,
@@ -120,4 +121,6 @@ public interface IRecoveryManager {
      * Deletes all temporary recovery files
      */
     public void deleteRecoveryTemporaryFiles();
+
+    void startLocalRecovery(Set<Integer> partitions) throws IOException, ACIDException;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index 9f2e3e7..46cd476 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -78,6 +78,17 @@ public class StoragePathUtil {
     }
 
     /**
+     * @param fileAbsolutePath
+     * @return the file relative path starting from the partition directory
+     */
+    public static String getIndexFileRelativePath(String fileAbsolutePath) {
+        String[] tokens = fileAbsolutePath.split(File.separator);
+        //partition/dataverse/idx/fileName
+        return tokens[tokens.length - 4] + File.separator + tokens[tokens.length - 3] + File.separator
+                + tokens[tokens.length - 2] + File.separator + tokens[tokens.length - 1];
+    }
+
+    /**
      * Create a file
      * Note: this method is not thread safe. It is the responsibility of the caller to ensure no path conflict when
      * creating files simultaneously

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/main/resources/schema/cluster.xsd
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/resources/schema/cluster.xsd b/asterixdb/asterix-common/src/main/resources/schema/cluster.xsd
index 79c377a..098b4e7 100644
--- a/asterixdb/asterix-common/src/main/resources/schema/cluster.xsd
+++ b/asterixdb/asterix-common/src/main/resources/schema/cluster.xsd
@@ -44,7 +44,7 @@
     <xs:element name="http_port" type="xs:integer" />
     <xs:element name="debug_port" type="xs:integer" />
     <xs:element name="metadata_node" type="xs:string" />
-    <xs:element name="enabled" type="xs:boolean" />
+    <xs:element name="enabled" type="xs:string" />
     <xs:element name="replication_port" type="xs:integer" />
     <xs:element name="replication_factor" type="xs:integer" />
     <xs:element name="auto_failover" type="xs:boolean" />
@@ -57,6 +57,8 @@
     <xs:element name="result_time_to_live" type="xs:long" />
     <xs:element name="result_sweep_threshold" type="xs:long" />
     <xs:element name="cc_root" type="xs:string" />
+    <xs:element name="strategy" type="xs:string" />
+    <xs:element name="node_id" type="xs:string" />
 
     <!-- definition of complex elements -->
     <xs:element name="working_dir">
@@ -87,15 +89,33 @@
     <xs:element name="data_replication">
         <xs:complexType>
             <xs:sequence>
-                <xs:element ref="cl:enabled" />
+                <xs:element ref="cl:strategy" />
                 <xs:element ref="cl:replication_port" />
                 <xs:element ref="cl:replication_factor" />
-                <xs:element ref="cl:auto_failover" />
                 <xs:element ref="cl:replication_time_out" />
             </xs:sequence>
         </xs:complexType>
     </xs:element>
 
+    <xs:element name="fault_tolerance">
+        <xs:complexType>
+            <xs:sequence>
+                <xs:element ref="cl:strategy" />
+                <xs:element ref="cl:replica" minOccurs="0"/>
+            </xs:sequence>
+        </xs:complexType>
+    </xs:element>
+
+    <xs:element name="high_availability">
+        <xs:complexType>
+            <xs:sequence>
+                <xs:element ref="cl:enabled" minOccurs="0"/>
+                <xs:element ref="cl:data_replication" minOccurs="0"/>
+                <xs:element ref="cl:fault_tolerance" minOccurs="0"/>
+            </xs:sequence>
+        </xs:complexType>
+    </xs:element>
+
     <xs:element name="property">
         <xs:complexType>
             <xs:sequence>
@@ -136,6 +156,14 @@
         </xs:complexType>
     </xs:element>
 
+    <xs:element name="replica">
+        <xs:complexType>
+            <xs:sequence>
+                <xs:element ref="cl:node_id" maxOccurs="unbounded" />
+            </xs:sequence>
+        </xs:complexType>
+    </xs:element>
+
     <xs:element name="cluster">
         <xs:complexType>
             <xs:sequence>
@@ -150,7 +178,7 @@
                 <xs:element ref="cl:iodevices" minOccurs="0" />
                 <xs:element ref="cl:working_dir" />
                 <xs:element ref="cl:metadata_node" />
-                <xs:element ref="cl:data_replication" minOccurs="0" />
+                <xs:element ref="cl:high_availability" minOccurs="0" />
                 <xs:element ref="cl:master_node" />
                 <xs:element ref="cl:node" maxOccurs="unbounded" />
                 <xs:element ref="cl:substitute_nodes" />

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
index 288a739..8213213 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
@@ -76,7 +76,6 @@ import org.apache.http.impl.client.StandardHttpRequestRetryHandler;
 import org.apache.http.util.EntityUtils;
 import org.apache.hyracks.util.StorageUtil;
 import org.junit.Assert;
-
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.JsonMappingException;
@@ -1108,6 +1107,21 @@ public class TestExecutor {
         }
         ProcessBuilder pb = new ProcessBuilder("kill", "-9", Integer.toString(nodePid));
         pb.start().waitFor();
+        // Delete NC's transaction logs to re-initialize it as a new NC.
+        deleteNCTxnLogs(nodeId, cUnit);
+    }
+
+    private void deleteNCTxnLogs(String nodeId, CompilationUnit cUnit) throws Exception {
+        OutputFormat fmt = OutputFormat.forCompilationUnit(cUnit);
+        String endpoint = "/admin/cluster";
+        InputStream executeJSONGet = executeJSONGet(fmt, new URI("http://" + host + ":" + port + endpoint));
+        StringWriter actual = new StringWriter();
+        IOUtils.copy(executeJSONGet, actual, StandardCharsets.UTF_8);
+        String config = actual.toString();
+        ObjectMapper om = new ObjectMapper();
+        String logDir = om.readTree(config).findPath("transaction.log.dirs").get(nodeId).asText();
+        ProcessBuilder pb = new ProcessBuilder("rm", "-rf", logDir);
+        pb.start().waitFor();
     }
 
     public void executeTest(String actualPath, TestCaseContext testCaseCtx, ProcessBuilder pb,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ConfigureCommand.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ConfigureCommand.java b/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ConfigureCommand.java
index 6d2b4b9..eac7586 100644
--- a/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ConfigureCommand.java
+++ b/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ConfigureCommand.java
@@ -44,7 +44,8 @@ public class ConfigureCommand extends AbstractCommand {
     @Override
     protected void execCommand() throws Exception {
         configureCluster("local", "local.xml");
-        configureCluster("local", "local_with_replication.xml");
+        configureCluster("local", "local_chained_declustering_rep.xml");
+        configureCluster("local", "local_metadata_only_rep.xml");
         configureCluster("demo", "demo.xml");
 
         String installerConfPath = InstallerDriver.getManagixHome() + File.separator + InstallerDriver.MANAGIX_CONF_XML;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java b/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
index 4037eaf..748d811 100644
--- a/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
+++ b/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
@@ -262,40 +262,16 @@ public class ValidateCommand extends AbstractCommand {
         boolean valid = true;
 
         //if replication is disabled, no need to validate the settings
-        if (cluster.getDataReplication() != null && cluster.getDataReplication().isEnabled()) {
-
-            if (cluster.getDataReplication().getReplicationFactor() == null) {
-                if (cluster.getNode().size() >= 3) {
-                    LOGGER.warn("Replication factor not defined. Using default value (3) " + WARNING);
-
-                } else {
-                    valid = false;
-                    LOGGER.fatal("Replication factor not defined for data repliaction. " + ERROR);
-                }
-
-            }
-
-            //replication factor = 1 means no replication
-            if (cluster.getDataReplication().getReplicationFactor().intValue() == 1) {
-                LOGGER.warn("Replication factor is set to 1. Disabling data replication" + WARNING);
-                return true;
-            }
-
-            if (cluster.getDataReplication().getReplicationFactor().intValue() > cluster.getNode().size()) {
-                LOGGER.fatal("Replication factor = " + cluster.getDataReplication().getReplicationFactor().intValue()
-                        + "  requires at least " + cluster.getDataReplication().getReplicationFactor().intValue()
-                        + " nodes in the cluster" + ERROR);
-                valid = false;
-            }
-
-            if (cluster.getDataReplication().getReplicationPort() == null
-                    || cluster.getDataReplication().getReplicationPort().toString().length() == 0) {
+        if (cluster.getHighAvailability() != null && cluster.getHighAvailability().getDataReplication() != null) {
+            if (cluster.getHighAvailability().getDataReplication().getReplicationPort() == null || cluster
+                    .getHighAvailability().getDataReplication().getReplicationPort().toString().length() == 0) {
                 valid = false;
                 LOGGER.fatal("Replication data port not defined for data repliaction. " + ERROR);
             }
 
-            if (cluster.getDataReplication().getReplicationTimeOut() == null
-                    || (cluster.getDataReplication().getReplicationTimeOut().intValue() + "").length() == 0) {
+            if (cluster.getHighAvailability().getDataReplication().getReplicationTimeOut() == null || String
+                    .valueOf(cluster.getHighAvailability().getDataReplication().getReplicationTimeOut().intValue())
+                    .length() == 0) {
                 LOGGER.warn("Replication maximum wait time not defined. Using default value (60 seconds) " + WARNING);
             }
 


Mime
View raw message