asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mhub...@apache.org
Subject [6/6] incubator-asterixdb git commit: Asterix NCs Failback Support
Date Thu, 18 Feb 2016 09:54:32 GMT
Asterix NCs Failback Support

- Allow Failed NCs to failback and takeover their partitions.
- New cluster API servlet for cluster state description.
- Remove nodeId from txn logs except remote FLUSH_LOG.
- Add partition id in UPDATE and Entity_COMMIT logs.
- Adapt remote recovery to new logs format.
- Refactor RecoveryManager and split Analysis and Redo phase.
- Spill remote recover logs to temporary file.
- Replicate files to remote replicas for partitions of interest only.
- Introduce NC active/inactive partitions concept.
- Test case for failback.

Change-Id: Id17819542d6b9c4e32647e64737c4a467b630f24
Reviewed-on: https://asterix-gerrit.ics.uci.edu/613
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/commit/98d38e6a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/tree/98d38e6a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/diff/98d38e6a

Branch: refs/heads/master
Commit: 98d38e6a0a3199a33e06a7665c8a587841b260c3
Parents: c318249
Author: Murtadha Hubail <mhubail@uci.edu>
Authored: Wed Feb 17 23:53:09 2016 -0800
Committer: Murtadha Hubail <hubailmor@gmail.com>
Committed: Thu Feb 18 01:49:33 2016 -0800

----------------------------------------------------------------------
 .gitignore                                      |    1 +
 .../operators/physical/CommitPOperator.java     |   21 +-
 .../operators/physical/CommitRuntime.java       |   17 +-
 .../physical/CommitRuntimeFactory.java          |    9 +-
 .../operators/physical/UpsertCommitRuntime.java |   13 +-
 .../rules/ReplaceSinkOpWithCommitOpRule.java    |   13 +-
 .../api/common/AsterixAppRuntimeContext.java    |   51 +-
 .../api/http/servlet/ClusterAPIServlet.java     |   51 +
 .../bootstrap/CCApplicationEntryPoint.java      |   54 +-
 .../bootstrap/ClusterLifecycleListener.java     |   17 +-
 .../bootstrap/NCApplicationEntryPoint.java      |   70 +-
 .../asterix/messaging/CCMessageBroker.java      |   24 +-
 .../asterix/messaging/NCMessageBroker.java      |  159 ++-
 .../common/api/IAsterixAppRuntimeContext.java   |   11 +-
 .../common/api/IClusterManagementWork.java      |    3 +-
 .../common/api/IDatasetLifecycleManager.java    |    7 +
 .../config/AsterixReplicationProperties.java    |   56 +-
 .../common/context/DatasetLifecycleManager.java |   19 +-
 ...erixLSMInsertDeleteOperatorNodePushable.java |    3 +-
 .../messaging/AbstractFailbackPlanMessage.java  |   39 +
 .../CompleteFailbackRequestMessage.java         |   56 +
 .../CompleteFailbackResponseMessage.java        |   49 +
 ...PreparePartitionsFailbackRequestMessage.java |   65 ++
 ...reparePartitionsFailbackResponseMessage.java |   41 +
 .../common/messaging/ReplicaEventMessage.java   |   52 +
 .../TakeoverPartitionsRequestMessage.java       |   10 +-
 .../messaging/api/IApplicationMessage.java      |    7 +-
 .../replication/IRemoteRecoveryManager.java     |   13 +-
 .../replication/IReplicaResourcesManager.java   |    8 +-
 .../common/replication/IReplicationManager.java |   22 +-
 .../common/replication/NodeFailbackPlan.java    |  209 ++++
 .../common/replication/ReplicaEvent.java        |   23 +-
 .../asterix/common/transactions/ILogRecord.java |   20 +-
 .../common/transactions/IRecoveryManager.java   |   43 +-
 .../asterix/common/transactions/LogRecord.java  |  214 ++--
 .../asterix/common/utils/ServletUtil.java       |   54 +
 .../asterix/common/utils/StoragePathUtil.java   |    4 +
 .../asterix/common/utils/TransactionUtil.java   |   34 +-
 .../apache/asterix/test/aql/TestExecutor.java   |   92 +-
 .../asterix/installer/test/ReplicationIT.java   |   14 +-
 .../node_failback/node_failback.1.ddl.aql       |   59 +
 .../node_failback/node_failback.10.cstate.aql   |   29 +
 .../node_failback/node_failback.11.query.aql    |   33 +
 .../node_failback/node_failback.2.update.aql    |   35 +
 .../node_failback/node_failback.3.vscript.aql   |    1 +
 .../node_failback/node_failback.4.sleep.aql     |    1 +
 .../node_failback/node_failback.5.cstate.aql    |   29 +
 .../node_failback/node_failback.6.query.aql     |   33 +
 .../node_failback/node_failback.7.update.aql    |   51 +
 .../node_failback/node_failback.8.vmgx.aql      |    1 +
 .../node_failback/node_failback.9.sleep.aql     |    1 +
 .../failover/bulkload/bulkload.2.update.aql     |    2 +-
 .../bulkload/bulkload.4.vagrant_script.aql      |    1 -
 .../failover/bulkload/bulkload.4.vscript.aql    |    1 +
 .../mem_component_recovery.2.update.aql         |    2 +-
 .../mem_component_recovery.4.vagrant_script.aql |    1 -
 .../mem_component_recovery.4.vscript.aql        |    1 +
 .../metadata_node.3.vagrant_script.aql          |    1 -
 .../metadata_node/metadata_node.3.vscript.aql   |    1 +
 .../node_failback.cluster_state.10.adm          |    1 +
 .../node_failback.cluster_state.5.adm           |    1 +
 .../node_failback/node_failback.query.11.adm    |    1 +
 .../node_failback/node_failback.query.6.adm     |    1 +
 .../integrationts/replication/testsuite.xml     |   41 +-
 .../apache/asterix/metadata/MetadataNode.java   |   26 +-
 .../om/util/AsterixClusterProperties.java       |  407 +++++--
 .../functions/ReplicaFilesRequest.java          |   22 +-
 .../functions/ReplicationProtocol.java          |   57 +-
 .../management/ReplicaEventNotifier.java        |  109 --
 .../management/ReplicationChannel.java          |  187 +--
 .../ReplicationLifecycleListener.java           |   77 --
 .../management/ReplicationManager.java          |  288 ++---
 .../recovery/RemoteRecoveryManager.java         |  221 +++-
 .../storage/LSMIndexFileProperties.java         |   10 +-
 .../storage/ReplicaResourcesManager.java        |   52 +-
 ...rixLSMPrimaryUpsertOperatorNodePushable.java |    5 +-
 ...tractIndexModificationOperationCallback.java |    4 +-
 ...imaryIndexModificationOperationCallback.java |    7 +-
 ...dexModificationOperationCallbackFactory.java |   10 +-
 ...ndaryIndexModificationOperationCallback.java |    5 +-
 ...dexModificationOperationCallbackFactory.java |   10 +-
 ...tasetIndexModificationOperationCallback.java |    5 +-
 ...dexModificationOperationCallbackFactory.java |    6 +-
 ...dexModificationOperationCallbackFactory.java |    6 +-
 .../opcallbacks/UpsertOperationCallback.java    |    7 +-
 .../UpsertOperationCallbackFactory.java         |    5 +-
 .../PersistentLocalResourceRepository.java      |   58 +-
 .../logging/LogManagerWithReplication.java      |    5 +-
 .../management/service/logging/LogReader.java   |    3 +-
 .../service/logging/RemoteLogReader.java        |  138 +++
 .../service/recovery/RecoveryManager.java       | 1099 ++++--------------
 .../management/service/recovery/TxnId.java      |  175 +++
 .../service/transaction/TransactionContext.java |    1 -
 93 files changed, 3087 insertions(+), 1914 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 90efb11..d6ab1f1 100644
--- a/.gitignore
+++ b/.gitignore
@@ -16,6 +16,7 @@ asterix-app/parserts/
 asterix-app/opt_parserts/
 asterix-app/runtime_parserts/
 asterix-installer/ittest/
+asterix-installer/repliationtest/
 build
 asterix_logs
 bin/

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
index 5112fcf..2d1cf1e 100644
--- a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
+++ b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
@@ -37,20 +37,25 @@ import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirement
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
 
 public class CommitPOperator extends AbstractPhysicalOperator {
 
     private final List<LogicalVariable> primaryKeyLogicalVars;
     private final JobId jobId;
     private final int datasetId;
+    private final String dataverse;
+    private final String dataset;
     private final LogicalVariable upsertVar;
 
-    public CommitPOperator(JobId jobId, int datasetId, List<LogicalVariable> primaryKeyLogicalVars,
-            LogicalVariable upsertVar) {
+    public CommitPOperator(JobId jobId, String dataverse, String dataset, int datasetId,
+            List<LogicalVariable> primaryKeyLogicalVars, LogicalVariable upsertVar) {
         this.jobId = jobId;
         this.datasetId = datasetId;
         this.primaryKeyLogicalVars = primaryKeyLogicalVars;
         this.upsertVar = upsertVar;
+        this.dataverse = dataverse;
+        this.dataset = dataset;
     }
 
     @Override
@@ -84,13 +89,23 @@ public class CommitPOperator extends AbstractPhysicalOperator {
         RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema,
                 context);
         int[] primaryKeyFields = JobGenHelper.variablesToFieldIndexes(primaryKeyLogicalVars, inputSchemas[0]);
+
+        //get dataset splits
+        FileSplit[] splitsForDataset = metadataProvider.splitsForDataset(metadataProvider.getMetadataTxnContext(),
+                dataverse, dataset, dataset, metadataProvider.isTemporaryDatasetWriteJob());
+        int[] datasetPartitions = new int[splitsForDataset.length];
+        for (int i = 0; i < splitsForDataset.length; i++) {
+            datasetPartitions[i] = splitsForDataset[i].getPartition();
+        }
+
         int upsertVarIdx = -1;
         CommitRuntimeFactory runtime = null;
         if (upsertVar != null) {
             upsertVarIdx = inputSchemas[0].findVariable(upsertVar);
         }
         runtime = new CommitRuntimeFactory(jobId, datasetId, primaryKeyFields,
-                metadataProvider.isTemporaryDatasetWriteJob(), metadataProvider.isWriteTransaction(), upsertVarIdx);
+                metadataProvider.isTemporaryDatasetWriteJob(), metadataProvider.isWriteTransaction(), upsertVarIdx,
+                datasetPartitions);
         builder.contributeMicroOperator(op, runtime, recDesc);
         ILogicalOperator src = op.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, op, 0);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
index b72018a..94519cf 100644
--- a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
+++ b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
@@ -28,8 +28,8 @@ import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.common.transactions.LogRecord;
+import org.apache.asterix.common.transactions.LogType;
 import org.apache.asterix.common.utils.TransactionUtil;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -58,9 +58,10 @@ public class CommitRuntime implements IPushRuntime {
 
     protected ITransactionContext transactionContext;
     protected FrameTupleAccessor frameTupleAccessor;
+    protected final int resourcePartition;
 
     public CommitRuntime(IHyracksTaskContext ctx, JobId jobId, int datasetId, int[] primaryKeyFields,
-            boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction) {
+            boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction, int resourcePartition) {
         IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
                 .getApplicationContext().getApplicationObject();
         this.ctx = ctx;
@@ -72,9 +73,9 @@ public class CommitRuntime implements IPushRuntime {
         this.frameTupleReference = new FrameTupleReference();
         this.isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob;
         this.isWriteTransaction = isWriteTransaction;
-        this.longHashes = new long[2];
-        this.logRecord = new LogRecord();
-        logRecord.setNodeId(logMgr.getNodeId());
+        this.resourcePartition = resourcePartition;
+        longHashes = new long[2];
+        logRecord = new LogRecord();
     }
 
     @Override
@@ -109,17 +110,17 @@ public class CommitRuntime implements IPushRuntime {
                 try {
                     formLogRecord(buffer, t);
                     logMgr.log(logRecord);
-                } catch (ACIDException | AlgebricksException e) {
+                } catch (ACIDException e) {
                     throw new HyracksDataException(e);
                 }
             }
         }
     }
 
-    protected void formLogRecord(ByteBuffer buffer, int t) throws AlgebricksException {
+    protected void formLogRecord(ByteBuffer buffer, int t) {
         int pkHash = computePrimaryKeyHashValue(frameTupleReference, primaryKeyFields);
         TransactionUtil.formEntityCommitLogRecord(logRecord, transactionContext, datasetId, pkHash, frameTupleReference,
-                primaryKeyFields);
+                primaryKeyFields, resourcePartition, LogType.ENTITY_COMMIT);
     }
 
     protected int computePrimaryKeyHashValue(ITupleReference tuple, int[] primaryKeyFields) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java
index 7d03796..4f28b9d 100644
--- a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java
+++ b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java
@@ -35,15 +35,17 @@ public class CommitRuntimeFactory implements IPushRuntimeFactory {
     private final boolean isTemporaryDatasetWriteJob;
     private final boolean isWriteTransaction;
     private final int upsertVarIdx;
+    private int[] datasetPartitions;
 
     public CommitRuntimeFactory(JobId jobId, int datasetId, int[] primaryKeyFields, boolean isTemporaryDatasetWriteJob,
-            boolean isWriteTransaction, int upsertVarIdx) {
+            boolean isWriteTransaction, int upsertVarIdx, int[] datasetPartitions) {
         this.jobId = jobId;
         this.datasetId = datasetId;
         this.primaryKeyFields = primaryKeyFields;
         this.isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob;
         this.isWriteTransaction = isWriteTransaction;
         this.upsertVarIdx = upsertVarIdx;
+        this.datasetPartitions = datasetPartitions;
     }
 
     @Override
@@ -55,10 +57,11 @@ public class CommitRuntimeFactory implements IPushRuntimeFactory {
     public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws AlgebricksException {
         if (upsertVarIdx >= 0) {
             return new UpsertCommitRuntime(ctx, jobId, datasetId, primaryKeyFields, isTemporaryDatasetWriteJob,
-                    isWriteTransaction, upsertVarIdx);
+                    isWriteTransaction, datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()],
+                    upsertVarIdx);
         } else {
             return new CommitRuntime(ctx, jobId, datasetId, primaryKeyFields, isTemporaryDatasetWriteJob,
-                    isWriteTransaction);
+                    isWriteTransaction, datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()]);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/UpsertCommitRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/UpsertCommitRuntime.java b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/UpsertCommitRuntime.java
index 81996d1..7358700 100644
--- a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/UpsertCommitRuntime.java
+++ b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/UpsertCommitRuntime.java
@@ -21,22 +21,23 @@ package org.apache.asterix.algebra.operators.physical;
 import java.nio.ByteBuffer;
 
 import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.common.transactions.LogType;
 import org.apache.asterix.common.utils.TransactionUtil;
 import org.apache.asterix.dataflow.data.nontagged.serde.ABooleanSerializerDeserializer;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 public class UpsertCommitRuntime extends CommitRuntime {
     private final int upsertIdx;
 
     public UpsertCommitRuntime(IHyracksTaskContext ctx, JobId jobId, int datasetId, int[] primaryKeyFields,
-            boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction, int upsertIdx) throws AlgebricksException {
-        super(ctx, jobId, datasetId, primaryKeyFields, isTemporaryDatasetWriteJob, isWriteTransaction);
+            boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction, int resourcePartition, int upsertIdx) {
+        super(ctx, jobId, datasetId, primaryKeyFields, isTemporaryDatasetWriteJob, isWriteTransaction,
+                resourcePartition);
         this.upsertIdx = upsertIdx;
     }
 
     @Override
-    protected void formLogRecord(ByteBuffer buffer, int t) throws AlgebricksException {
+    protected void formLogRecord(ByteBuffer buffer, int t) {
         boolean isNull = ABooleanSerializerDeserializer.getBoolean(buffer.array(),
                 frameTupleAccessor.getFieldSlotsLength() + frameTupleAccessor.getTupleStartOffset(t)
                         + frameTupleAccessor.getFieldStartOffset(t, upsertIdx) + 1);
@@ -46,8 +47,8 @@ public class UpsertCommitRuntime extends CommitRuntime {
         } else {
             // Previous record found (delete + insert)
             int pkHash = computePrimaryKeyHashValue(frameTupleReference, primaryKeyFields);
-            TransactionUtil.formEntityUpsertCommitLogRecord(logRecord, transactionContext, datasetId, pkHash,
-                    frameTupleReference, primaryKeyFields);
+            TransactionUtil.formEntityCommitLogRecord(logRecord, transactionContext, datasetId, pkHash,
+                    frameTupleReference, primaryKeyFields, resourcePartition, LogType.UPSERT_ENTITY_COMMIT);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
index f8df183..ef8b4a3 100644
--- a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
@@ -68,6 +68,8 @@ public class ReplaceSinkOpWithCommitOpRule implements IAlgebraicRewriteRule {
 
         List<Mutable<ILogicalExpression>> primaryKeyExprs = null;
         int datasetId = 0;
+        String dataverse = null;
+        String datasetName = null;
         AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) sinkOperator.getInputs().get(0).getValue();
         LogicalVariable upsertVar = null;
         AssignOperator upsertFlagAssign = null;
@@ -79,6 +81,10 @@ public class ReplaceSinkOpWithCommitOpRule implements IAlgebraicRewriteRule {
                     primaryKeyExprs = indexInsertDeleteUpsertOperator.getPrimaryKeyExpressions();
                     datasetId = ((DatasetDataSource) indexInsertDeleteUpsertOperator.getDataSourceIndex()
                             .getDataSource()).getDataset().getDatasetId();
+                    dataverse = ((DatasetDataSource) indexInsertDeleteUpsertOperator.getDataSourceIndex()
+                            .getDataSource()).getDataset().getDataverseName();
+                    datasetName = ((DatasetDataSource) indexInsertDeleteUpsertOperator.getDataSourceIndex()
+                            .getDataSource()).getDataset().getDatasetName();
                     break;
                 }
             } else if (descendantOp.getOperatorTag() == LogicalOperatorTag.INSERT_DELETE_UPSERT) {
@@ -87,6 +93,10 @@ public class ReplaceSinkOpWithCommitOpRule implements IAlgebraicRewriteRule {
                     primaryKeyExprs = insertDeleteUpsertOperator.getPrimaryKeyExpressions();
                     datasetId = ((DatasetDataSource) insertDeleteUpsertOperator.getDataSource()).getDataset()
                             .getDatasetId();
+                    dataverse = ((DatasetDataSource) insertDeleteUpsertOperator.getDataSource()).getDataset()
+                            .getDataverseName();
+                    datasetName = ((DatasetDataSource) insertDeleteUpsertOperator.getDataSource()).getDataset()
+                            .getDatasetName();
                     if (insertDeleteUpsertOperator.getOperation() == Kind.UPSERT) {
                         //we need to add a function that checks if previous record was found
                         upsertVar = context.newVar();
@@ -132,7 +142,8 @@ public class ReplaceSinkOpWithCommitOpRule implements IAlgebraicRewriteRule {
 
         //create the logical and physical operator
         CommitOperator commitOperator = new CommitOperator(primaryKeyLogicalVars, upsertVar);
-        CommitPOperator commitPOperator = new CommitPOperator(jobId, datasetId, primaryKeyLogicalVars, upsertVar);
+        CommitPOperator commitPOperator = new CommitPOperator(jobId, dataverse, datasetName, datasetId,
+                primaryKeyLogicalVars, upsertVar);
         commitOperator.setPhysicalOperator(commitPOperator);
 
         //create ExtensionOperator and put the commitOperator in it.

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
index 8a40876..5532e79 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
@@ -22,12 +22,14 @@ import java.io.IOException;
 import java.rmi.RemoteException;
 import java.rmi.server.UnicastRemoteObject;
 import java.util.List;
+import java.util.Set;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.AsterixThreadExecutor;
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.AsterixBuildProperties;
 import org.apache.asterix.common.config.AsterixCompilerProperties;
 import org.apache.asterix.common.config.AsterixExternalProperties;
@@ -105,8 +107,6 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
         }
     }
 
-    private static final int METADATA_IO_DEVICE_ID = 0;
-
     private ILSMMergePolicyFactory metadataMergePolicyFactory;
     private final INCApplicationContext ncApplicationContext;
 
@@ -126,7 +126,7 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
     private ITransactionSubsystem txnSubsystem;
 
     private ILSMIOOperationScheduler lsmIOScheduler;
-    private ILocalResourceRepository localResourceRepository;
+    private PersistentLocalResourceRepository localResourceRepository;
     private IResourceIdFactory resourceIdFactory;
     private IIOManager ioManager;
     private boolean isShuttingdown;
@@ -153,6 +153,7 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
         this.metadataRmiPort = metadataRmiPort;
     }
 
+    @Override
     public void initialize(boolean initialRun) throws IOException, ACIDException, AsterixException {
         Logger.getLogger("org.apache").setLevel(externalProperties.getLogLevel());
 
@@ -172,7 +173,7 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
         ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory = new PersistentLocalResourceRepositoryFactory(
                 ioManager, ncApplicationContext.getNodeId(), metadataProperties);
 
-        localResourceRepository = persistentLocalResourceRepositoryFactory.createRepository();
+        localResourceRepository = (PersistentLocalResourceRepository) persistentLocalResourceRepositoryFactory.createRepository();
 
         IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider = new AsterixAppRuntimeContextProdiverForRecovery(
                 this);
@@ -183,7 +184,7 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
         SystemState systemState = recoveryMgr.getSystemState();
         if (initialRun || systemState == SystemState.NEW_UNIVERSE) {
             //delete any storage data before the resource factory is initialized
-            ((PersistentLocalResourceRepository) localResourceRepository).deleteStorageData(true);
+            localResourceRepository.deleteStorageData(true);
         }
         initializeResourceIdFactory();
 
@@ -208,7 +209,22 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
             txnSubsystem.getLogManager().setReplicationManager(replicationManager);
 
             //PersistentLocalResourceRepository to replicate metadata files and delete backups on drop index
-            ((PersistentLocalResourceRepository) localResourceRepository).setReplicationManager(replicationManager);
+            localResourceRepository.setReplicationManager(replicationManager);
+
+            /**
+             * add the partitions that will be replicated in this node as inactive partitions
+             */
+            //get nodes which replicate to this node
+            Set<String> replicationClients = replicationProperties.getNodeReplicationClients(nodeId);
+            //remove the node itself
+            replicationClients.remove(nodeId);
+            for (String clientId : replicationClients) {
+                //get the partitions of each client
+                ClusterPartition[] clientPartitions = metadataProperties.getNodePartitions().get(clientId);
+                for (ClusterPartition partition : clientPartitions) {
+                    localResourceRepository.addInactivePartition(partition.getPartitionId());
+                }
+            }
 
             //initialize replication channel
             replicationChannel = new ReplicationChannel(nodeId, replicationProperties, txnSubsystem.getLogManager(),
@@ -220,7 +236,6 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
             bufferCache = new BufferCache(ioManager, prs, pcp, fileMapManager,
                     storageProperties.getBufferCacheMaxOpenFiles(), ncApplicationContext.getThreadFactory(),
                     replicationManager);
-
         } else {
             bufferCache = new BufferCache(ioManager, prs, pcp, fileMapManager,
                     storageProperties.getBufferCacheMaxOpenFiles(), ncApplicationContext.getThreadFactory());
@@ -251,57 +266,65 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
         lccm.register((ILifeCycleComponent) txnSubsystem.getLockManager());
     }
 
+    @Override
     public boolean isShuttingdown() {
         return isShuttingdown;
     }
 
+    @Override
     public void setShuttingdown(boolean isShuttingdown) {
         this.isShuttingdown = isShuttingdown;
     }
 
+    @Override
     public void deinitialize() throws HyracksDataException {
     }
 
+    @Override
     public IBufferCache getBufferCache() {
         return bufferCache;
     }
 
+    @Override
     public IFileMapProvider getFileMapManager() {
         return fileMapManager;
     }
 
+    @Override
     public ITransactionSubsystem getTransactionSubsystem() {
         return txnSubsystem;
     }
 
+    @Override
     public IDatasetLifecycleManager getDatasetLifecycleManager() {
         return datasetLifecycleManager;
     }
 
+    @Override
     public double getBloomFilterFalsePositiveRate() {
         return storageProperties.getBloomFilterFalsePositiveRate();
     }
 
+    @Override
     public ILSMIOOperationScheduler getLSMIOScheduler() {
         return lsmIOScheduler;
     }
 
+    @Override
     public ILocalResourceRepository getLocalResourceRepository() {
         return localResourceRepository;
     }
 
+    @Override
     public IResourceIdFactory getResourceIdFactory() {
         return resourceIdFactory;
     }
 
+    @Override
     public IIOManager getIOManager() {
         return ioManager;
     }
 
-    public int getMetaDataIODeviceId() {
-        return METADATA_IO_DEVICE_ID;
-    }
-
     @Override
     public AsterixStorageProperties getStorageProperties() {
         return storageProperties;
@@ -352,6 +375,7 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
         return threadExecutor;
     }
 
+    @Override
     public ILSMMergePolicyFactory getMetadataMergePolicyFactory() {
         return metadataMergePolicyFactory;
     }
@@ -421,4 +445,9 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
         IMetadataNode stub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE, metadataRmiPort);
         ((IAsterixStateProxy) ncApplicationContext.getDistributedState()).setMetadataNode(stub);
     }
+
+    @Override
+    public void unexportMetadataNodeStub() throws RemoteException {
+        UnicastRemoteObject.unexportObject(MetadataNode.INSTANCE, false);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ClusterAPIServlet.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ClusterAPIServlet.java b/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ClusterAPIServlet.java
new file mode 100644
index 0000000..dce10ab
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ClusterAPIServlet.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.api.http.servlet;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.result.ResultUtils;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+public class ClusterAPIServlet extends HttpServlet {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
+        response.setContentType("application/json");
+        response.setCharacterEncoding("utf-8");
+        PrintWriter responseWriter = response.getWriter();
+        try {
+            JSONObject responseObject = AsterixClusterProperties.INSTANCE.getClusterStateDescription();
+            responseWriter.write(responseObject.toString());
+            response.setStatus(HttpServletResponse.SC_OK);
+        } catch (JSONException e) {
+            ResultUtils.apiErrorHandler(responseWriter, e);
+            response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+        }
+        responseWriter.flush();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index bee284d..adf0a4d 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -23,6 +23,7 @@ import java.util.logging.Logger;
 
 import org.apache.asterix.api.http.servlet.APIServlet;
 import org.apache.asterix.api.http.servlet.AQLAPIServlet;
+import org.apache.asterix.api.http.servlet.ClusterAPIServlet;
 import org.apache.asterix.api.http.servlet.ConnectorAPIServlet;
 import org.apache.asterix.api.http.servlet.DDLAPIServlet;
 import org.apache.asterix.api.http.servlet.FeedServlet;
@@ -37,7 +38,7 @@ import org.apache.asterix.common.api.AsterixThreadFactory;
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.config.AsterixExternalProperties;
 import org.apache.asterix.common.config.AsterixMetadataProperties;
-import org.apache.asterix.common.config.AsterixReplicationProperties;
+import org.apache.asterix.common.utils.ServletUtil.Servlets;
 import org.apache.asterix.compiler.provider.AqlCompilationProvider;
 import org.apache.asterix.compiler.provider.SqlppCompilationProvider;
 import org.apache.asterix.event.service.ILookupService;
@@ -51,7 +52,6 @@ import org.apache.asterix.metadata.bootstrap.AsterixStateProxy;
 import org.apache.asterix.metadata.cluster.ClusterManager;
 import org.apache.asterix.om.util.AsterixAppContextInfo;
 import org.apache.asterix.om.util.AsterixClusterProperties;
-import org.apache.asterix.replication.management.ReplicationLifecycleListener;
 import org.apache.hyracks.api.application.ICCApplicationContext;
 import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
 import org.apache.hyracks.api.client.HyracksConnection;
@@ -88,11 +88,9 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
         }
 
         appCtx.setThreadFactory(new AsterixThreadFactory(new LifeCycleComponentManager()));
-        GlobalRecoveryManager.INSTANCE = new GlobalRecoveryManager(
-                (HyracksConnection) getNewHyracksClientConnection());
+        GlobalRecoveryManager.INSTANCE = new GlobalRecoveryManager((HyracksConnection) getNewHyracksClientConnection());
 
-        AsterixAppContextInfo.initialize(appCtx, getNewHyracksClientConnection(),
-                GlobalRecoveryManager.INSTANCE);
+        AsterixAppContextInfo.initialize(appCtx, getNewHyracksClientConnection(), GlobalRecoveryManager.INSTANCE);
 
         proxy = AsterixStateProxy.registerRemoteObject();
         appCtx.setDistributedState(proxy);
@@ -118,13 +116,6 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
 
         ClusterManager.INSTANCE.registerSubscriber(GlobalRecoveryManager.INSTANCE);
 
-        AsterixReplicationProperties asterixRepliactionProperties = AsterixAppContextInfo.getInstance()
-                .getReplicationProperties();
-        if (asterixRepliactionProperties.isReplicationEnabled()) {
-            ReplicationLifecycleListener.INSTANCE = new ReplicationLifecycleListener(asterixRepliactionProperties);
-            ClusterManager.INSTANCE.registerSubscriber(ReplicationLifecycleListener.INSTANCE);
-        }
-
         ccAppCtx.addClusterLifecycleListener(ClusterLifecycleListener.INSTANCE);
         ccAppCtx.setMessageBroker(messageBroker);
     }
@@ -178,25 +169,32 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
         jsonAPIServer.setHandler(context);
 
         // AQL rest APIs.
-        context.addServlet(new ServletHolder(new QueryAPIServlet(new AqlCompilationProvider())), "/query");
-        context.addServlet(new ServletHolder(new UpdateAPIServlet(new AqlCompilationProvider())), "/update");
-        context.addServlet(new ServletHolder(new DDLAPIServlet(new AqlCompilationProvider())), "/ddl");
-        context.addServlet(new ServletHolder(new AQLAPIServlet(new AqlCompilationProvider())), "/aql");
+        context.addServlet(new ServletHolder(new QueryAPIServlet(new AqlCompilationProvider())),
+                Servlets.AQL_QUERY.getPath());
+        context.addServlet(new ServletHolder(new UpdateAPIServlet(new AqlCompilationProvider())),
+                Servlets.AQL_UPDATE.getPath());
+        context.addServlet(new ServletHolder(new DDLAPIServlet(new AqlCompilationProvider())),
+                Servlets.AQL_DDL.getPath());
+        context.addServlet(new ServletHolder(new AQLAPIServlet(new AqlCompilationProvider())), Servlets.AQL.getPath());
 
         // SQL++ rest APIs.
-        context.addServlet(new ServletHolder(new QueryAPIServlet(new SqlppCompilationProvider())), "/query/sqlpp");
-        context.addServlet(new ServletHolder(new UpdateAPIServlet(new SqlppCompilationProvider())), "/update/sqlpp");
-        context.addServlet(new ServletHolder(new DDLAPIServlet(new SqlppCompilationProvider())), "/ddl/sqlpp");
-        context.addServlet(new ServletHolder(new AQLAPIServlet(new SqlppCompilationProvider())), "/sqlpp");
+        context.addServlet(new ServletHolder(new QueryAPIServlet(new SqlppCompilationProvider())),
+                Servlets.SQLPP_QUERY.getPath());
+        context.addServlet(new ServletHolder(new UpdateAPIServlet(new SqlppCompilationProvider())),
+                Servlets.SQLPP_UPDATE.getPath());
+        context.addServlet(new ServletHolder(new DDLAPIServlet(new SqlppCompilationProvider())),
+                Servlets.SQLPP_DDL.getPath());
+        context.addServlet(new ServletHolder(new AQLAPIServlet(new SqlppCompilationProvider())),
+                Servlets.SQLPP.getPath());
 
         // Other APIs.
-        context.addServlet(new ServletHolder(new QueryStatusAPIServlet()), "/query/status");
-        context.addServlet(new ServletHolder(new QueryResultAPIServlet()), "/query/result");
-        context.addServlet(new ServletHolder(new ConnectorAPIServlet()), "/connector");
-        context.addServlet(new ServletHolder(new ShutdownAPIServlet()), "/admin/shutdown");
-        context.addServlet(new ServletHolder(new VersionAPIServlet()), "/admin/version");
-
-        context.addServlet(new ServletHolder(new QueryServiceServlet()), "/query/service");
+        context.addServlet(new ServletHolder(new QueryStatusAPIServlet()), Servlets.QUERY_STATUS.getPath());
+        context.addServlet(new ServletHolder(new QueryResultAPIServlet()), Servlets.QUERY_RESULT.getPath());
+        context.addServlet(new ServletHolder(new QueryServiceServlet()), Servlets.QUERY_SERVICE.getPath());
+        context.addServlet(new ServletHolder(new ConnectorAPIServlet()), Servlets.CONNECTOR.getPath());
+        context.addServlet(new ServletHolder(new ShutdownAPIServlet()), Servlets.SHUTDOWN.getPath());
+        context.addServlet(new ServletHolder(new VersionAPIServlet()), Servlets.VERSION.getPath());
+        context.addServlet(new ServletHolder(new ClusterAPIServlet()), Servlets.CLUSTER_STATE.getPath());
     }
 
     private void setupFeedServer(AsterixExternalProperties externalProperties) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
index 00b7391..4514fee 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
@@ -62,17 +62,15 @@ public class ClusterLifecycleListener implements IClusterLifecycleListener {
         t.start();
     }
 
-    public enum ClusterEventType {
-        NODE_JOIN,
-        NODE_FAILURE
-    }
-
     @Override
     public void notifyNodeJoin(String nodeId, Map<String, String> ncConfiguration) {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("NC: " + nodeId + " joined");
         }
         AsterixClusterProperties.INSTANCE.addNCConfiguration(nodeId, ncConfiguration);
+        //if metadata node rejoining, we need to rebind the proxy connection when it is active again.
+        MetadataManager.INSTANCE.rebindMetadataNode = !AsterixClusterProperties.INSTANCE.isMetadataNodeActive();
+
         Set<String> nodeAddition = new HashSet<String>();
         nodeAddition.add(nodeId);
         updateProgress(ClusterEventType.NODE_JOIN, nodeAddition);
@@ -90,17 +88,16 @@ public class ClusterLifecycleListener implements IClusterLifecycleListener {
 
     }
 
+    @Override
     public void notifyNodeFailure(Set<String> deadNodeIds) {
         for (String deadNode : deadNodeIds) {
             if (LOGGER.isLoggable(Level.INFO)) {
                 LOGGER.info("NC: " + deadNode + " left");
             }
-            //if metadata node failed, we need to rebind the proxy connection when it joins again.
-            String metadataNode = AsterixClusterProperties.INSTANCE.getCurrentMetadataNode();
-            if (deadNode.equals(metadataNode)) {
-                MetadataManager.INSTANCE.rebindMetadataNode = true;
-            }
             AsterixClusterProperties.INSTANCE.removeNCConfiguration(deadNode);
+
+            //if metadata node failed, we need to rebind the proxy connection when it is active again
+            MetadataManager.INSTANCE.rebindMetadataNode = !AsterixClusterProperties.INSTANCE.isMetadataNodeActive();
         }
         updateProgress(ClusterEventType.NODE_FAILURE, deadNodeIds);
         Set<IClusterEventsSubscriber> subscribers = ClusterManager.INSTANCE.getRegisteredClusterEventSubscribers();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index dac9af5..fcb196d 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -19,7 +19,6 @@
 package org.apache.asterix.hyracks.bootstrap;
 
 import java.io.File;
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -30,7 +29,6 @@ import org.apache.asterix.api.common.AsterixAppRuntimeContext;
 import org.apache.asterix.common.api.AsterixThreadFactory;
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.config.AsterixMetadataProperties;
-import org.apache.asterix.common.config.AsterixReplicationProperties;
 import org.apache.asterix.common.config.AsterixTransactionProperties;
 import org.apache.asterix.common.config.IAsterixPropertiesProvider;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
@@ -71,8 +69,7 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
     private boolean isMetadataNode = false;
     private boolean stopInitiated = false;
     private SystemState systemState = SystemState.NEW_UNIVERSE;
-    private boolean performedRemoteRecovery = false;
-    private boolean replicationEnabled = false;
+    private boolean pendingFailbackCompletion = false;
     private IMessageBroker messageBroker;
 
     @Override
@@ -90,8 +87,7 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
 
         ncAppCtx.setThreadFactory(new AsterixThreadFactory(ncAppCtx.getLifeCycleComponentManager()));
         ncApplicationContext = ncAppCtx;
-        messageBroker = new NCMessageBroker((NodeControllerService) ncAppCtx.getControllerService());
-        ncApplicationContext.setMessageBroker(messageBroker);
+
         nodeId = ncApplicationContext.getNodeId();
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Starting Asterix node controller: " + nodeId);
@@ -108,17 +104,14 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
         }
         runtimeContext.initialize(initialRun);
         ncApplicationContext.setApplicationObject(runtimeContext);
+        messageBroker = new NCMessageBroker((NodeControllerService) ncAppCtx.getControllerService());
+        ncApplicationContext.setMessageBroker(messageBroker);
 
-        //If replication is enabled, check if there is a replica for this node
-        AsterixReplicationProperties asterixReplicationProperties = ((IAsterixPropertiesProvider) runtimeContext)
-                .getReplicationProperties();
-
-        replicationEnabled = asterixReplicationProperties.isReplicationEnabled();
-
+        boolean replicationEnabled = AsterixClusterProperties.INSTANCE.isReplicationEnabled();
+        boolean autoFailover = AsterixClusterProperties.INSTANCE.isAutoFailoverEnabled();
         if (initialRun) {
             LOGGER.info("System is being initialized. (first run)");
         } else {
-            //#. recover if the system is corrupted by checking system state.
             IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
             systemState = recoveryMgr.getSystemState();
 
@@ -130,36 +123,41 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
                 if (systemState == SystemState.NEW_UNIVERSE || systemState == SystemState.CORRUPTED) {
                     //Try to perform remote recovery
                     IRemoteRecoveryManager remoteRecoveryMgr = runtimeContext.getRemoteRecoveryManager();
-                    remoteRecoveryMgr.performRemoteRecovery();
-                    performedRemoteRecovery = true;
-                    systemState = SystemState.HEALTHY;
+                    if (autoFailover) {
+                        remoteRecoveryMgr.startFailbackProcess();
+                        systemState = SystemState.RECOVERING;
+                        pendingFailbackCompletion = true;
+                    } else {
+                        remoteRecoveryMgr.performRemoteRecovery();
+                        systemState = SystemState.HEALTHY;
+                    }
+                }
+            } else {
+                //recover if the system is corrupted by checking system state.
+                if (systemState == SystemState.CORRUPTED) {
+                    recoveryMgr.startRecovery(true);
                 }
-            }
-
-            if (systemState == SystemState.CORRUPTED) {
-                recoveryMgr.startRecovery(true);
             }
         }
 
-        if (replicationEnabled) {
+        /**
+         * if the node pending failback completion, the replication channel
+         * should not be opened to avoid other nodes connecting to it before
+         * the node completes its failback. CC will notify other replicas once
+         * this node is ready to receive replication requests.
+         */
+        if (replicationEnabled && !pendingFailbackCompletion) {
             startReplicationService();
         }
     }
 
-    private void startReplicationService() throws IOException {
+    private void startReplicationService() {
         //Open replication channel
         runtimeContext.getReplicationChannel().start();
 
         //Check the state of remote replicas
         runtimeContext.getReplicationManager().initializeReplicasState();
 
-        if (performedRemoteRecovery) {
-            //Notify remote replicas about the new IP Address if changed
-            //Note: this is a hack since each node right now maintains its own copy of the cluster configuration.
-            //Once the configuration is centralized on the CC, this step wont be needed.
-            runtimeContext.getReplicationManager().broadcastNewIPAddress();
-        }
-
         //Start replication after the state of remote replicas has been initialized.
         runtimeContext.getReplicationManager().startReplicationThreads();
     }
@@ -211,10 +209,10 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
         }
 
         isMetadataNode = nodeId.equals(metadataProperties.getMetadataNodeName());
-        if (isMetadataNode) {
+        if (isMetadataNode && !pendingFailbackCompletion) {
             runtimeContext.initializeMetadata(systemState == SystemState.NEW_UNIVERSE);
         }
-        ExternalLibraryBootstrap.setUpExternaLibraries(isMetadataNode);
+        ExternalLibraryBootstrap.setUpExternaLibraries(isMetadataNode && !pendingFailbackCompletion);
 
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Starting lifecycle components");
@@ -237,11 +235,13 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
 
         lccm.startAll();
 
-        IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
-        recoveryMgr.checkpoint(true, RecoveryManager.NON_SHARP_CHECKPOINT_TARGET_LSN);
+        if (!pendingFailbackCompletion) {
+            IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
+            recoveryMgr.checkpoint(true, RecoveryManager.NON_SHARP_CHECKPOINT_TARGET_LSN);
 
-        if (isMetadataNode) {
-            runtimeContext.exportMetadataNodeStub();
+            if (isMetadataNode) {
+                runtimeContext.exportMetadataNodeStub();
+            }
         }
 
         //Clean any temporary files

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
index aeaef59..258bc35 100644
--- a/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
+++ b/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -22,9 +22,12 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.PreparePartitionsFailbackResponseMessage;
+import org.apache.asterix.common.messaging.CompleteFailbackResponseMessage;
 import org.apache.asterix.common.messaging.ReportMaxResourceIdMessage;
 import org.apache.asterix.common.messaging.ReportMaxResourceIdRequestMessage;
 import org.apache.asterix.common.messaging.ResourceIdRequestMessage;
@@ -54,6 +57,9 @@ public class CCMessageBroker implements ICCMessageBroker {
     @Override
     public void receivedMessage(IMessage message, String nodeId) throws Exception {
         AbstractApplicationMessage absMessage = (AbstractApplicationMessage) message;
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Received message: " + absMessage.getMessageType().name());
+        }
         switch (absMessage.getMessageType()) {
             case RESOURCE_ID_REQUEST:
                 handleResourceIdRequest(message, nodeId);
@@ -67,6 +73,12 @@ public class CCMessageBroker implements ICCMessageBroker {
             case TAKEOVER_METADATA_NODE_RESPONSE:
                 handleTakeoverMetadataNodeResponse(message);
                 break;
+            case PREPARE_PARTITIONS_FAILBACK_RESPONSE:
+                handleClosePartitionsResponse(message);
+                break;
+            case COMPLETE_FAILBACK_RESPONSE:
+                handleCompleteFailbcakResponse(message);
+                break;
             default:
                 LOGGER.warning("Unknown message: " + absMessage.getMessageType());
                 break;
@@ -78,7 +90,7 @@ public class CCMessageBroker implements ICCMessageBroker {
         ResourceIdRequestResponseMessage reponse = new ResourceIdRequestResponseMessage();
         reponse.setId(msg.getId());
         //cluster is not active
-        if (!AsterixClusterProperties.isClusterActive()) {
+        if (!AsterixClusterProperties.INSTANCE.isClusterActive()) {
             reponse.setResourceId(-1);
             reponse.setException(new Exception("Cannot generate global resource id when cluster is not active."));
         } else if (nodesReportedMaxResourceId.size() < AsterixClusterProperties.getNumberOfNodes()) {
@@ -126,4 +138,14 @@ public class CCMessageBroker implements ICCMessageBroker {
         TakeoverMetadataNodeResponseMessage msg = (TakeoverMetadataNodeResponseMessage) message;
         AsterixClusterProperties.INSTANCE.processMetadataNodeTakeoverResponse(msg);
     }
+
+    private void handleCompleteFailbcakResponse(IMessage message) {
+        CompleteFailbackResponseMessage msg = (CompleteFailbackResponseMessage) message;
+        AsterixClusterProperties.INSTANCE.processCompleteFailbackResponse(msg);
+    }
+
+    private void handleClosePartitionsResponse(IMessage message) {
+        PreparePartitionsFailbackResponseMessage msg = (PreparePartitionsFailbackResponseMessage) message;
+        AsterixClusterProperties.INSTANCE.processPreparePartitionsFailbackResponse(msg);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index 8f8723e..0a0a917 100644
--- a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++ b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -21,9 +21,16 @@ package org.apache.asterix.messaging;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.CompleteFailbackRequestMessage;
+import org.apache.asterix.common.messaging.CompleteFailbackResponseMessage;
+import org.apache.asterix.common.messaging.PreparePartitionsFailbackRequestMessage;
+import org.apache.asterix.common.messaging.PreparePartitionsFailbackResponseMessage;
+import org.apache.asterix.common.messaging.ReplicaEventMessage;
 import org.apache.asterix.common.messaging.ReportMaxResourceIdMessage;
 import org.apache.asterix.common.messaging.TakeoverMetadataNodeResponseMessage;
 import org.apache.asterix.common.messaging.TakeoverPartitionsRequestMessage;
@@ -32,18 +39,26 @@ import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.asterix.common.messaging.api.IApplicationMessageCallback;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.replication.IRemoteRecoveryManager;
+import org.apache.asterix.common.replication.Replica;
+import org.apache.asterix.common.replication.ReplicaEvent;
+import org.apache.asterix.event.schema.cluster.Node;
 import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.messages.IMessage;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
 import org.apache.hyracks.control.nc.NodeControllerService;
 
 public class NCMessageBroker implements INCMessageBroker {
+    private final static Logger LOGGER = Logger.getLogger(NCMessageBroker.class.getName());
+
     private final NodeControllerService ncs;
     private final AtomicLong messageId = new AtomicLong(0);
     private final Map<Long, IApplicationMessageCallback> callbacks;
+    private final IAsterixAppRuntimeContext appContext;
 
     public NCMessageBroker(NodeControllerService ncs) {
         this.ncs = ncs;
+        appContext = (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
         callbacks = new ConcurrentHashMap<Long, IApplicationMessageCallback>();
     }
 
@@ -67,56 +82,72 @@ public class NCMessageBroker implements INCMessageBroker {
 
     @Override
     public void receivedMessage(IMessage message, String nodeId) throws Exception {
-        AbstractApplicationMessage absMessage = (AbstractApplicationMessage) message;
-        //if the received message is a response to a sent message, deliver it to the sender
-        IApplicationMessageCallback callback = callbacks.remove(absMessage.getId());
-        if (callback != null) {
-            callback.deliverMessageResponse(absMessage);
-        }
+        try {
+            AbstractApplicationMessage absMessage = (AbstractApplicationMessage) message;
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Received message: " + absMessage.getMessageType().name());
+            }
+            //if the received message is a response to a sent message, deliver it to the sender
+            IApplicationMessageCallback callback = callbacks.remove(absMessage.getId());
+            if (callback != null) {
+                callback.deliverMessageResponse(absMessage);
+            }
 
-        //handle requests from CC
-        switch (absMessage.getMessageType()) {
-            case REPORT_MAX_RESOURCE_ID_REQUEST:
-                reportMaxResourceId();
-                break;
-            case TAKEOVER_PARTITIONS_REQUEST:
-                handleTakeoverPartitons(message);
-                break;
-            case TAKEOVER_METADATA_NODE_REQUEST:
-                handleTakeoverMetadataNode(message);
-                break;
-            default:
-                break;
+            //handle requests from CC
+            switch (absMessage.getMessageType()) {
+                case REPORT_MAX_RESOURCE_ID_REQUEST:
+                    reportMaxResourceId();
+                    break;
+                case TAKEOVER_PARTITIONS_REQUEST:
+                    handleTakeoverPartitons(message);
+                    break;
+                case TAKEOVER_METADATA_NODE_REQUEST:
+                    handleTakeoverMetadataNode(message);
+                    break;
+                case PREPARE_PARTITIONS_FAILBACK_REQUEST:
+                    handlePreparePartitionsFailback(message);
+                    break;
+                case COMPLETE_FAILBACK_REQUEST:
+                    handleCompleteFailbackRequest(message);
+                    break;
+                case REPLICA_EVENT:
+                    handleReplicaEvent(message);
+                    break;
+                default:
+                    break;
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw e;
         }
     }
 
     private void handleTakeoverPartitons(IMessage message) throws Exception {
         TakeoverPartitionsRequestMessage msg = (TakeoverPartitionsRequestMessage) message;
-        IAsterixAppRuntimeContext appContext = (IAsterixAppRuntimeContext) ncs.getApplicationContext()
-                .getApplicationObject();
-        IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager();
-        remoteRecoeryManager.takeoverPartitons(msg.getFailedNode(), msg.getPartitions());
-        //send response after takeover is completed
-        TakeoverPartitionsResponseMessage reponse = new TakeoverPartitionsResponseMessage(msg.getRequestId(),
-                appContext.getTransactionSubsystem().getId(), msg.getPartitions());
-        sendMessage(reponse, null);
+        try {
+            IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager();
+            remoteRecoeryManager.takeoverPartitons(msg.getPartitions());
+        } finally {
+            //send response after takeover is completed
+            TakeoverPartitionsResponseMessage reponse = new TakeoverPartitionsResponseMessage(msg.getRequestId(),
+                    appContext.getTransactionSubsystem().getId(), msg.getPartitions());
+            sendMessage(reponse, null);
+        }
     }
 
     private void handleTakeoverMetadataNode(IMessage message) throws Exception {
-        IAsterixAppRuntimeContext appContext = (IAsterixAppRuntimeContext) ncs.getApplicationContext()
-                .getApplicationObject();
-        appContext.initializeMetadata(false);
-        appContext.exportMetadataNodeStub();
-        //send response after takeover is completed
-        TakeoverMetadataNodeResponseMessage reponse = new TakeoverMetadataNodeResponseMessage(
-                appContext.getTransactionSubsystem().getId());
-        sendMessage(reponse, null);
+        try {
+            appContext.initializeMetadata(false);
+            appContext.exportMetadataNodeStub();
+        } finally {
+            TakeoverMetadataNodeResponseMessage reponse = new TakeoverMetadataNodeResponseMessage(
+                    appContext.getTransactionSubsystem().getId());
+            sendMessage(reponse, null);
+        }
     }
 
     @Override
     public void reportMaxResourceId() throws Exception {
-        IAsterixAppRuntimeContext appContext = (IAsterixAppRuntimeContext) ncs.getApplicationContext()
-                .getApplicationObject();
         ReportMaxResourceIdMessage maxResourceIdMsg = new ReportMaxResourceIdMessage();
         //resource ids < FIRST_AVAILABLE_USER_DATASET_ID are reserved for metadata indexes.
         long maxResourceId = Math.max(appContext.getLocalResourceRepository().getMaxResourceID(),
@@ -124,4 +155,58 @@ public class NCMessageBroker implements INCMessageBroker {
         maxResourceIdMsg.setMaxResourceId(maxResourceId);
         sendMessage(maxResourceIdMsg, null);
     }
+
+    private void handleReplicaEvent(IMessage message) {
+        ReplicaEventMessage msg = (ReplicaEventMessage) message;
+        Node node = new Node();
+        node.setId(msg.getNodeId());
+        node.setClusterIp(msg.getNodeIPAddress());
+        Replica replica = new Replica(node);
+        ReplicaEvent event = new ReplicaEvent(replica, msg.getEvent());
+        appContext.getReplicationManager().reportReplicaEvent(event);
+    }
+
+    private void handlePreparePartitionsFailback(IMessage message) throws Exception {
+        PreparePartitionsFailbackRequestMessage msg = (PreparePartitionsFailbackRequestMessage) message;
+        /**
+         * if the metadata partition will be failed back
+         * we need to flush and close all datasets including metadata datasets
+         * otherwise we need to close all non-metadata datasets and flush metadata datasets
+         * so that their memory components will be copied to the failing back node
+         */
+        if (msg.isReleaseMetadataNode()) {
+            appContext.getDatasetLifecycleManager().closeAllDatasets();
+            //remove the metadata node stub from RMI registry
+            appContext.unexportMetadataNodeStub();
+        } else {
+            //close all non-metadata datasets
+            appContext.getDatasetLifecycleManager().closeUserDatasets();
+            //flush the remaining metadata datasets that were not closed
+            appContext.getDatasetLifecycleManager().flushAllDatasets();
+        }
+
+        //mark the partitions to be closed as inactive
+        PersistentLocalResourceRepository localResourceRepo = (PersistentLocalResourceRepository) appContext
+                .getLocalResourceRepository();
+        for (Integer partitionId : msg.getPartitions()) {
+            localResourceRepo.addInactivePartition(partitionId);
+        }
+
+        //send response after partitions prepared for failback
+        PreparePartitionsFailbackResponseMessage reponse = new PreparePartitionsFailbackResponseMessage(msg.getPlanId(),
+                msg.getRequestId(), msg.getPartitions());
+        sendMessage(reponse, null);
+    }
+
+    private void handleCompleteFailbackRequest(IMessage message) throws Exception {
+        CompleteFailbackRequestMessage msg = (CompleteFailbackRequestMessage) message;
+        try {
+            IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager();
+            remoteRecoeryManager.completeFailbackProcess();
+        } finally {
+            CompleteFailbackResponseMessage reponse = new CompleteFailbackResponseMessage(msg.getPlanId(),
+                    msg.getRequestId(), msg.getPartitions());
+            sendMessage(reponse, null);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
index 975180b..496a10e 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
@@ -55,8 +55,6 @@ public interface IAsterixAppRuntimeContext {
 
     public ILSMMergePolicyFactory getMetadataMergePolicyFactory();
 
-    public int getMetaDataIODeviceId();
-
     public IBufferCache getBufferCache();
 
     public IFileMapProvider getFileMapManager();
@@ -93,14 +91,23 @@ public interface IAsterixAppRuntimeContext {
 
     /**
      * Exports the metadata node to the metadata RMI port.
+     *
      * @throws RemoteException
      */
     public void exportMetadataNodeStub() throws RemoteException;
 
     /**
      * Initializes the metadata node and bootstraps the metadata.
+     *
      * @param newUniverse
      * @throws Exception
      */
     public void initializeMetadata(boolean newUniverse) throws Exception;
+
+    /**
+     * Unexports the metadata node from the RMI registry
+     *
+     * @throws RemoteException
+     */
+    public void unexportMetadataNodeStub() throws RemoteException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java b/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
index 75f1f82..adf8e38 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
@@ -28,7 +28,8 @@ public interface IClusterManagementWork {
     public enum ClusterState {
         STARTING,
         ACTIVE,
-        UNUSABLE
+        UNUSABLE,
+        REBALANCING
     }
 
     public WorkType getClusterManagementWorkType();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index 7e02faf..3b4617a 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -94,4 +94,11 @@ public interface IDatasetLifecycleManager extends IIndexLifecycleManager {
      * @return a list of all indexes that are open at the time of the call.
      */
     List<IndexInfo> getOpenIndexesInfo();
+
+    /**
+     * Flushes and closes all user datasets (non-metadata datasets)
+     *
+     * @throws HyracksDataException
+     */
+    void closeUserDatasets() throws HyracksDataException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
index fa5b503..019c168 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
@@ -34,7 +34,7 @@ public class AsterixReplicationProperties extends AbstractAsterixProperties {
     private static int REPLICATION_DATAPORT_DEFAULT = 2000;
     private static int REPLICATION_FACTOR_DEFAULT = 1;
     private static int REPLICATION_TIME_OUT_DEFAULT = 15;
-
+    private static final int MAX_REMOTE_RECOVERY_ATTEMPTS = 5;
     private static final String NODE_IP_ADDRESS_DEFAULT = "127.0.0.1";
     private final String NODE_NAME_PREFIX;
     private final Cluster cluster;
@@ -88,10 +88,10 @@ public class AsterixReplicationProperties extends AbstractAsterixProperties {
         Set<Replica> remoteReplicas = new HashSet<Replica>();;
 
         int numberOfRemoteReplicas = getReplicationFactor() - 1;
-
         //Using chained-declustering
         if (cluster != null) {
             int nodeIndex = -1;
+            //find the node index in the cluster config
             for (int i = 0; i < cluster.getNode().size(); i++) {
                 Node node = cluster.getNode().get(i);
                 if (getRealCluserNodeID(node.getId()).equals(nodeId)) {
@@ -106,19 +106,18 @@ public class AsterixReplicationProperties extends AbstractAsterixProperties {
                 return null;
             }
 
+            //find nodes to the right of this node
             for (int i = nodeIndex + 1; i < cluster.getNode().size(); i++) {
                 remoteReplicas.add(getReplicaByNodeIndex(i));
-
                 if (remoteReplicas.size() == numberOfRemoteReplicas) {
                     break;
                 }
             }
 
+            //if not all remote replicas have been found, start from the beginning
             if (remoteReplicas.size() != numberOfRemoteReplicas) {
                 for (int i = 0; i < cluster.getNode().size(); i++) {
-
                     remoteReplicas.add(getReplicaByNodeIndex(i));
-
                     if (remoteReplicas.size() == numberOfRemoteReplicas) {
                         break;
                     }
@@ -194,4 +193,51 @@ public class AsterixReplicationProperties extends AbstractAsterixProperties {
         }
         return REPLICATION_TIME_OUT_DEFAULT;
     }
+
+    /**
+     * @param nodeId
+     * @return The set of nodes which replicate to this node, including the node itself
+     */
+    public Set<String> getNodeReplicationClients(String nodeId) {
+        Set<String> clientReplicas = new HashSet<>();
+        clientReplicas.add(nodeId);
+
+        int clientsCount = getReplicationFactor();
+
+        //Using chained-declustering backwards
+        if (cluster != null) {
+            int nodeIndex = -1;
+            //find the node index in the cluster config
+            for (int i = 0; i < cluster.getNode().size(); i++) {
+                Node node = cluster.getNode().get(i);
+                if (getRealCluserNodeID(node.getId()).equals(nodeId)) {
+                    nodeIndex = i;
+                    break;
+                }
+            }
+
+            //find nodes to the left of this node
+            for (int i = nodeIndex - 1; i >= 0; i--) {
+                clientReplicas.add(getReplicaByNodeIndex(i).getId());
+                if (clientReplicas.size() == clientsCount) {
+                    break;
+                }
+            }
+
+            //if not all client replicas have been found, start from the end
+            if (clientReplicas.size() != clientsCount) {
+                for (int i = cluster.getNode().size() - 1; i >= 0; i--) {
+                    clientReplicas.add(getReplicaByNodeIndex(i).getId());
+                    if (clientReplicas.size() == clientsCount) {
+                        break;
+                    }
+                }
+            }
+        }
+        return clientReplicas;
+    }
+
+    public int getMaxRemoteRecoveryAttempts() {
+        return MAX_REMOTE_RECOVERY_ATTEMPTS;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 53902e1..0cd88d6 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -75,7 +75,6 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
         capacity = storageProperties.getMemoryComponentGlobalBudget();
         used = 0;
         logRecord = new LogRecord();
-        logRecord.setNodeId(logManager.getNodeId());
     }
 
     @Override
@@ -543,7 +542,8 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
     private void flushDatasetOpenIndexes(DatasetInfo dsInfo, boolean asyncFlush) throws HyracksDataException {
         if (!dsInfo.isExternal) {
             synchronized (logRecord) {
-                TransactionUtil.formFlushLogRecord(logRecord, dsInfo.datasetID, null, dsInfo.indexes.size());
+                TransactionUtil.formFlushLogRecord(logRecord, dsInfo.datasetID, null, logManager.getNodeId(),
+                        dsInfo.indexes.size());
                 try {
                     logManager.log(logRecord);
                 } catch (ACIDException e) {
@@ -612,13 +612,24 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
     }
 
     @Override
-    public void closeAllDatasets() throws HyracksDataException {
-        for (DatasetInfo dsInfo : datasetInfos.values()) {
+    public synchronized void closeAllDatasets() throws HyracksDataException {
+        List<DatasetInfo> openDatasets = new ArrayList<>(datasetInfos.values());
+        for (DatasetInfo dsInfo : openDatasets) {
             closeDataset(dsInfo);
         }
     }
 
     @Override
+    public synchronized void closeUserDatasets() throws HyracksDataException {
+        List<DatasetInfo> openDatasets = new ArrayList<>(datasetInfos.values());
+        for (DatasetInfo dsInfo : openDatasets) {
+            if (dsInfo.datasetID >= firstAvilableUserDatasetID) {
+                closeDataset(dsInfo);
+            }
+        }
+    }
+
+    @Override
     public synchronized void stop(boolean dumpState, OutputStream outputStream) throws IOException {
         if (dumpState) {
             dumpState(outputStream);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
index d25e51f..afdbb31 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
@@ -65,7 +65,8 @@ public class AsterixLSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUp
         try {
             writer.open();
             modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
-                    indexHelper.getResourcePath(), indexHelper.getResourceID(), lsmIndex, ctx);
+                    indexHelper.getResourcePath(), indexHelper.getResourceID(), indexHelper.getResourcePartition(),
+                    lsmIndex, ctx);
             indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
             ITupleFilterFactory tupleFilterFactory = opDesc.getTupleFilterFactory();
             if (tupleFilterFactory != null) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractFailbackPlanMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractFailbackPlanMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractFailbackPlanMessage.java
new file mode 100644
index 0000000..2c72051
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractFailbackPlanMessage.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public abstract class AbstractFailbackPlanMessage extends AbstractApplicationMessage {
+
+    private static final long serialVersionUID = 1L;
+    protected final long planId;
+    protected final int requestId;
+
+    public AbstractFailbackPlanMessage(long planId, int requestId) {
+        this.planId = planId;
+        this.requestId = requestId;
+    }
+
+    public long getPlanId() {
+        return planId;
+    }
+
+    public int getRequestId() {
+        return requestId;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/messaging/CompleteFailbackRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/CompleteFailbackRequestMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/CompleteFailbackRequestMessage.java
new file mode 100644
index 0000000..510817a
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/CompleteFailbackRequestMessage.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+import java.util.Set;
+
+public class CompleteFailbackRequestMessage extends AbstractFailbackPlanMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final Set<Integer> partitions;
+    private final String nodeId;
+
+    public CompleteFailbackRequestMessage(long planId, int requestId, String nodeId, Set<Integer> partitions) {
+        super(planId, requestId);
+        this.nodeId = nodeId;
+        this.partitions = partitions;
+    }
+
+    @Override
+    public ApplicationMessageType getMessageType() {
+        return ApplicationMessageType.COMPLETE_FAILBACK_REQUEST;
+    }
+
+    public Set<Integer> getPartitions() {
+        return partitions;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Plan ID: " + planId);
+        sb.append(" Node ID: " + nodeId);
+        sb.append(" Partitions: " + partitions);
+        return sb.toString();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/messaging/CompleteFailbackResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/CompleteFailbackResponseMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/CompleteFailbackResponseMessage.java
new file mode 100644
index 0000000..6d77920
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/CompleteFailbackResponseMessage.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+import java.util.Set;
+
+public class CompleteFailbackResponseMessage extends AbstractFailbackPlanMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final Set<Integer> partitions;
+
+    public CompleteFailbackResponseMessage(long planId, int requestId, Set<Integer> partitions) {
+        super(planId, requestId);
+        this.partitions = partitions;
+    }
+
+    @Override
+    public ApplicationMessageType getMessageType() {
+        return ApplicationMessageType.COMPLETE_FAILBACK_RESPONSE;
+    }
+
+    public Set<Integer> getPartitions() {
+        return partitions;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Plan ID: " + planId);
+        sb.append(" Partitions: " + partitions);
+        return sb.toString();
+    }
+}
\ No newline at end of file


Mime
View raw message