asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mhub...@apache.org
Subject [4/4] incubator-asterixdb git commit: Asterix NCs Fault Tolerance
Date Tue, 26 Jan 2016 23:31:27 GMT
Asterix NCs Fault Tolerance

This change includes the following:
- Adapt replication to unique partitions storage.
- Implement auto failover for failing NCs.
- Implement auto failover for metadata node.
- Fix for ASTERIXDB-1251 using proper error message.
- Basic replication test cases using vagrant virtual cluster for:
   1. LSM bulkload components replication.
   2. LSM Memory components replication and recovery.
   3. Metadata node takeover.
These test cases will be part of the cluster test profile.

Change-Id: Ice26d980912a315fcb3efdd571d6ce88717cfea4
Reviewed-on: https://asterix-gerrit.ics.uci.edu/573
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/commit/8fc8bf8b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/tree/8fc8bf8b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/diff/8fc8bf8b

Branch: refs/heads/master
Commit: 8fc8bf8b510bdc635f949f2eebf8b4d0d2a6b008
Parents: 5b068d2
Author: Murtadha Hubail <mhubail@uci.edu>
Authored: Sat Jan 23 22:26:59 2016 -0800
Committer: Murtadha Hubail <hubailmor@gmail.com>
Committed: Tue Jan 26 15:26:32 2016 -0800

----------------------------------------------------------------------
 .../api/common/AsterixAppRuntimeContext.java    |  59 +++-
 .../bootstrap/AsterixGlobalRecoveryManager.java | 217 ------------
 .../bootstrap/CCApplicationEntryPoint.java      |  12 +-
 .../bootstrap/ClusterLifecycleListener.java     |  18 +-
 .../bootstrap/GlobalRecoveryManager.java        | 225 ++++++++++++
 .../bootstrap/NCApplicationEntryPoint.java      |  36 +-
 .../asterix/messaging/CCMessageBroker.java      |  26 +-
 .../asterix/messaging/NCMessageBroker.java      |  33 ++
 .../common/api/IAsterixAppRuntimeContext.java   |  14 +
 .../common/cluster/IGlobalRecoveryMaanger.java  |  29 ++
 .../config/AsterixMetadataProperties.java       |   4 +
 .../config/AsterixReplicationProperties.java    |  13 +-
 .../IAsterixApplicationContextInfo.java         |   3 +
 .../TakeoverMetadataNodeRequestMessage.java     |  29 ++
 .../TakeoverMetadataNodeResponseMessage.java    |  38 ++
 .../TakeoverPartitionsRequestMessage.java       |  72 ++++
 .../TakeoverPartitionsResponseMessage.java      |  50 +++
 .../messaging/api/IApplicationMessage.java      |   6 +-
 .../common/messaging/api/ICCMessageBroker.java  |  32 ++
 .../replication/IRemoteRecoveryManager.java     |  16 +
 .../replication/IReplicaResourcesManager.java   |  13 +-
 .../common/transactions/IRecoveryManager.java   |  10 +
 .../asterix/common/utils/StoragePathUtil.java   |  10 +-
 .../src/main/resources/schema/cluster.xsd       |   4 +-
 .../apache/asterix/test/aql/TestExecutor.java   |  27 ++
 .../asterix/event/util/PatternCreator.java      |  29 --
 asterix-installer/pom.xml                       |   4 +
 .../installer/command/ValidateCommand.java      |   6 -
 .../asterix/installer/test/ReplicationIT.java   | 257 ++++++++++++++
 .../clusterts/cluster_with_replication.xml      |  63 ++++
 .../src/test/resources/clusterts/known_hosts    |  10 +-
 .../failover/bulkload/bulkload.1.ddl.aql        |  54 +++
 .../failover/bulkload/bulkload.2.update.aql     |  32 ++
 .../failover/bulkload/bulkload.3.txnqbc.aql     |  31 ++
 .../bulkload/bulkload.4.vagrant_script.aql      |   1 +
 .../failover/bulkload/bulkload.5.sleep.aql      |   1 +
 .../failover/bulkload/bulkload.6.txnqar.aql     |  31 ++
 .../mem_component_recovery.1.ddl.aql            |  58 ++++
 .../mem_component_recovery.2.update.aql         |  34 ++
 .../mem_component_recovery.3.txnqbc.aql         |  32 ++
 .../mem_component_recovery.4.vagrant_script.aql |   1 +
 .../mem_component_recovery.5.sleep.aql          |   1 +
 .../mem_component_recovery.6.txnqar.aql         |  32 ++
 .../metadata_node/metadata_node.1.ddl.aql       |  55 +++
 .../metadata_node/metadata_node.2.txnqbc.aql    |  30 ++
 .../metadata_node.3.vagrant_script.aql          |   1 +
 .../metadata_node/metadata_node.4.sleep.aql     |   1 +
 .../metadata_node/metadata_node.5.txnqar.aql    |  32 ++
 .../integrationts/replication/testsuite.xml     |  37 ++
 .../metadata/bootstrap/MetadataBootstrap.java   |   6 +
 .../utils/SplitsAndConstraintsUtil.java         |   6 +
 .../asterix/om/util/AsterixAppContextInfo.java  |  19 +-
 .../om/util/AsterixClusterProperties.java       | 189 +++++++++-
 .../functions/AsterixReplicationProtocol.java   | 346 -------------------
 .../functions/ReplicationProtocol.java          | 346 +++++++++++++++++++
 .../logging/ReplicationLogBuffer.java           |   4 +-
 .../management/ReplicaEventNotifier.java        |   6 +-
 .../management/ReplicaStateChecker.java         |   4 +-
 .../management/ReplicationChannel.java          | 123 +++----
 .../management/ReplicationManager.java          |  92 +++--
 .../recovery/RemoteRecoveryManager.java         |  31 +-
 .../replication/storage/AsterixFilesUtil.java   |  78 -----
 .../storage/AsterixLSMIndexFileProperties.java  | 191 ----------
 .../storage/LSMComponentProperties.java         |  33 +-
 .../storage/LSMIndexFileProperties.java         | 162 +++++++++
 .../storage/ReplicaResourcesManager.java        | 331 ++++++++----------
 .../src/test/resources/data/fbu.adm             |  10 +
 .../test/resources/scripts/delete_storage.sh    |  18 +
 .../test/resources/scripts/kill_cc_and_nc.sh    |  18 +
 .../PersistentLocalResourceRepository.java      |  73 ++--
 ...ersistentLocalResourceRepositoryFactory.java |   8 +-
 .../service/recovery/RecoveryManager.java       | 273 +++++++++++++--
 72 files changed, 2814 insertions(+), 1382 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
index 6d0b321..8a40876 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
@@ -19,7 +19,10 @@
 package org.apache.asterix.api.common;
 
 import java.io.IOException;
+import java.rmi.RemoteException;
+import java.rmi.server.UnicastRemoteObject;
 import java.util.List;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.AsterixThreadExecutor;
@@ -46,9 +49,14 @@ import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
+import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.external.feed.api.IFeedManager;
 import org.apache.asterix.external.feed.management.FeedManager;
-import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataNode;
+import org.apache.asterix.metadata.api.IAsterixStateProxy;
+import org.apache.asterix.metadata.api.IMetadataNode;
+import org.apache.asterix.metadata.bootstrap.MetadataBootstrap;
 import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties;
 import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.asterix.replication.management.ReplicationChannel;
@@ -85,6 +93,7 @@ import org.apache.hyracks.storage.common.file.ILocalResourceRepositoryFactory;
 import org.apache.hyracks.storage.common.file.IResourceIdFactory;
 
 public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAsterixPropertiesProvider {
+    private static final Logger LOGGER = Logger.getLogger(AsterixAppRuntimeContext.class.getName());
 
     private static final AsterixPropertiesAccessor ASTERIX_PROPERTIES_ACCESSOR;
 
@@ -128,8 +137,9 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
     private IReplicationManager replicationManager;
     private IRemoteRecoveryManager remoteRecoveryManager;
     private IReplicaResourcesManager replicaResourcesManager;
+    private final int metadataRmiPort;
 
-    public AsterixAppRuntimeContext(INCApplicationContext ncApplicationContext) {
+    public AsterixAppRuntimeContext(INCApplicationContext ncApplicationContext, int metadataRmiPort) {
         this.ncApplicationContext = ncApplicationContext;
         compilerProperties = new AsterixCompilerProperties(ASTERIX_PROPERTIES_ACCESSOR);
         externalProperties = new AsterixExternalProperties(ASTERIX_PROPERTIES_ACCESSOR);
@@ -140,6 +150,7 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
         buildProperties = new AsterixBuildProperties(ASTERIX_PROPERTIES_ACCESSOR);
         replicationProperties = new AsterixReplicationProperties(ASTERIX_PROPERTIES_ACCESSOR,
                 AsterixClusterProperties.INSTANCE.getCluster());
+        this.metadataRmiPort = metadataRmiPort;
     }
 
     public void initialize(boolean initialRun) throws IOException, ACIDException, AsterixException {
@@ -159,7 +170,8 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
         metadataMergePolicyFactory = new PrefixMergePolicyFactory();
 
         ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory = new PersistentLocalResourceRepositoryFactory(
-                ioManager, ncApplicationContext.getNodeId());
+                ioManager, ncApplicationContext.getNodeId(), metadataProperties);
+
         localResourceRepository = persistentLocalResourceRepositoryFactory.createRepository();
 
         IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider = new AsterixAppRuntimeContextProdiverForRecovery(
@@ -186,9 +198,7 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
         if (replicationProperties.isReplicationEnabled()) {
             String nodeId = ncApplicationContext.getNodeId();
 
-            replicaResourcesManager = new ReplicaResourcesManager(ioManager.getIODevices(),
-                    AsterixClusterProperties.INSTANCE.getStorageDirectoryName(), nodeId,
-                    replicationProperties.getReplicationStore());
+            replicaResourcesManager = new ReplicaResourcesManager(localResourceRepository, metadataProperties);
 
             replicationManager = new ReplicationManager(nodeId, replicationProperties, replicaResourcesManager,
                     txnSubsystem.getLogManager(), asterixAppRuntimeContextProvider);
@@ -225,14 +235,14 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
          * to process any logs that might be generated during stopping these components
          */
         lccm.register((ILifeCycleComponent) txnSubsystem.getLogManager());
-        lccm.register((ILifeCycleComponent) txnSubsystem.getRecoveryManager());
         /**
-         * ReplicationManager must be stopped after indexLifecycleManager so that any logs/files generated
-         * during closing datasets are sent to remote replicas
+         * ReplicationManager must be stopped after indexLifecycleManager and recovery manager
+         * so that any logs/files generated during closing datasets or checkpoints are sent to remote replicas
          */
         if (replicationManager != null) {
             lccm.register(replicationManager);
         }
+        lccm.register((ILifeCycleComponent) txnSubsystem.getRecoveryManager());
         /**
          * Stopping indexLifecycleManager will flush and close all datasets.
          */
@@ -380,4 +390,35 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
     public void initializeResourceIdFactory() throws HyracksDataException {
         resourceIdFactory = new GlobalResourceIdFactoryProvider(ncApplicationContext).createResourceIdFactory();
     }
+
+    @Override
+    public void initializeMetadata(boolean newUniverse) throws Exception {
+        IAsterixStateProxy proxy = null;
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Bootstrapping metadata");
+        }
+        MetadataNode.INSTANCE.initialize(this);
+
+        proxy = (IAsterixStateProxy) ncApplicationContext.getDistributedState();
+        if (proxy == null) {
+            throw new IllegalStateException("Metadata node cannot access distributed state");
+        }
+
+        // This is a special case, we just give the metadataNode directly.
+        // This way we can delay the registration of the metadataNode until
+        // it is completely initialized.
+        MetadataManager.INSTANCE = new MetadataManager(proxy, MetadataNode.INSTANCE);
+        MetadataBootstrap.startUniverse(this, ncApplicationContext, newUniverse);
+        MetadataBootstrap.startDDLRecovery();
+
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Metadata node bound");
+        }
+    }
+
+    @Override
+    public void exportMetadataNodeStub() throws RemoteException {
+        IMetadataNode stub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE, metadataRmiPort);
+        ((IAsterixStateProxy) ncApplicationContext.getDistributedState()).setMetadataNode(stub);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java
deleted file mode 100644
index 4fae7e9..0000000
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.hyracks.bootstrap;
-
-import java.util.List;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.api.IClusterEventsSubscriber;
-import org.apache.asterix.common.api.IClusterManagementWork;
-import org.apache.asterix.common.api.IClusterManagementWorkResponse;
-import org.apache.asterix.common.config.MetadataConstants;
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState;
-import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.feed.CentralFeedManager;
-import org.apache.asterix.file.ExternalIndexingOperations;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.declared.AqlMetadataProvider;
-import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.metadata.entities.Dataverse;
-import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
-import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.om.util.AsterixClusterProperties;
-import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
-import org.apache.hyracks.api.client.HyracksConnection;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class AsterixGlobalRecoveryManager implements IClusterEventsSubscriber {
-
-    private static ClusterState state;
-    private static final Logger LOGGER = Logger.getLogger(AsterixGlobalRecoveryManager.class.getName());
-    private HyracksConnection hcc;
-    public static AsterixGlobalRecoveryManager INSTANCE;
-
-    public AsterixGlobalRecoveryManager(HyracksConnection hcc) throws Exception {
-        state = AsterixClusterProperties.INSTANCE.getState();
-        this.hcc = hcc;
-    }
-
-    @Override
-    public Set<IClusterManagementWork> notifyNodeFailure(Set<String> deadNodeIds) {
-        state = AsterixClusterProperties.INSTANCE.getState();
-        AsterixClusterProperties.INSTANCE.setGlobalRecoveryCompleted(false);
-        return null;
-    }
-
-    @Override
-    public Set<IClusterManagementWork> notifyNodeJoin(String joinedNodeId) {
-        // perform global recovery if state changed to active
-        final ClusterState newState = AsterixClusterProperties.INSTANCE.getState();
-        boolean needToRecover = !newState.equals(state) && (newState == ClusterState.ACTIVE);
-        if (needToRecover) {
-            Thread recoveryThread = new Thread(new Runnable() {
-                @Override
-                public void run() {
-                    LOGGER.info("Starting AsterixDB's Global Recovery");
-                    MetadataTransactionContext mdTxnCtx = null;
-                    try {
-                        Thread.sleep(4000);
-                        MetadataManager.INSTANCE.init();
-                        // Loop over datasets
-                        mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-                        List<Dataverse> dataverses = MetadataManager.INSTANCE.getDataverses(mdTxnCtx);
-                        for (Dataverse dataverse : dataverses) {
-                            if (!dataverse.getDataverseName().equals(MetadataConstants.METADATA_DATAVERSE_NAME)) {
-                                AqlMetadataProvider metadataProvider = new AqlMetadataProvider(dataverse, CentralFeedManager.getInstance());
-                                List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx,
-                                        dataverse.getDataverseName());
-                                for (Dataset dataset : datasets) {
-                                    if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
-                                        // External dataset
-                                        // Get indexes
-                                        List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx,
-                                                dataset.getDataverseName(), dataset.getDatasetName());
-                                        if (indexes.size() > 0) {
-                                            // Get the state of the dataset
-                                            ExternalDatasetDetails dsd = (ExternalDatasetDetails) dataset
-                                                    .getDatasetDetails();
-                                            ExternalDatasetTransactionState datasetState = dsd.getState();
-                                            if (datasetState == ExternalDatasetTransactionState.BEGIN) {
-                                                List<ExternalFile> files = MetadataManager.INSTANCE
-                                                        .getDatasetExternalFiles(mdTxnCtx, dataset);
-                                                // if persumed abort, roll backward
-                                                // 1. delete all pending files
-                                                for (ExternalFile file : files) {
-                                                    if (file.getPendingOp() != ExternalFilePendingOp.PENDING_NO_OP) {
-                                                        MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
-                                                    }
-                                                }
-                                                // 2. clean artifacts in NCs
-                                                metadataProvider.setMetadataTxnContext(mdTxnCtx);
-                                                JobSpecification jobSpec = ExternalIndexingOperations.buildAbortOp(
-                                                        dataset, indexes, metadataProvider);
-                                                executeHyracksJob(jobSpec);
-                                                // 3. correct the dataset state
-                                                ((ExternalDatasetDetails) dataset.getDatasetDetails())
-                                                        .setState(ExternalDatasetTransactionState.COMMIT);
-                                                MetadataManager.INSTANCE.updateDataset(mdTxnCtx, dataset);
-                                                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                                                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-                                            } else if (datasetState == ExternalDatasetTransactionState.READY_TO_COMMIT) {
-                                                List<ExternalFile> files = MetadataManager.INSTANCE
-                                                        .getDatasetExternalFiles(mdTxnCtx, dataset);
-                                                // if ready to commit, roll forward
-                                                // 1. commit indexes in NCs
-                                                metadataProvider.setMetadataTxnContext(mdTxnCtx);
-                                                JobSpecification jobSpec = ExternalIndexingOperations.buildRecoverOp(
-                                                        dataset, indexes, metadataProvider);
-                                                executeHyracksJob(jobSpec);
-                                                // 2. add pending files in metadata
-                                                for (ExternalFile file : files) {
-                                                    if (file.getPendingOp() == ExternalFilePendingOp.PENDING_ADD_OP) {
-                                                        MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
-                                                        file.setPendingOp(ExternalFilePendingOp.PENDING_NO_OP);
-                                                        MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file);
-                                                    } else if (file.getPendingOp() == ExternalFilePendingOp.PENDING_DROP_OP) {
-                                                        // find original file
-                                                        for (ExternalFile originalFile : files) {
-                                                            if (originalFile.getFileName().equals(file.getFileName())) {
-                                                                MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
-                                                                        file);
-                                                                MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
-                                                                        originalFile);
-                                                                break;
-                                                            }
-                                                        }
-                                                    } else if (file.getPendingOp() == ExternalFilePendingOp.PENDING_APPEND_OP) {
-                                                        // find original file
-                                                        for (ExternalFile originalFile : files) {
-                                                            if (originalFile.getFileName().equals(file.getFileName())) {
-                                                                MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
-                                                                        file);
-                                                                MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
-                                                                        originalFile);
-                                                                originalFile.setSize(file.getSize());
-                                                                MetadataManager.INSTANCE.addExternalFile(mdTxnCtx,
-                                                                        originalFile);
-                                                            }
-                                                        }
-                                                    }
-                                                }
-                                                // 3. correct the dataset state
-                                                ((ExternalDatasetDetails) dataset.getDatasetDetails())
-                                                        .setState(ExternalDatasetTransactionState.COMMIT);
-                                                MetadataManager.INSTANCE.updateDataset(mdTxnCtx, dataset);
-                                                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                                                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-                                            }
-                                        }
-                                    }
-                                }
-                            }
-                        }
-                        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                    } catch (Exception e) {
-                        // This needs to be fixed <-- Needs to shutdown the system -->
-                        /*
-                         * Note: Throwing this illegal state exception will terminate this thread
-                         * and feeds listeners will not be notified.
-                         */
-                        LOGGER.severe("Global recovery was not completed successfully" + e);
-                        try {
-                            MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-                        } catch (Exception e1) {
-                            if (LOGGER.isLoggable(Level.SEVERE)) {
-                                LOGGER.severe("Exception in aborting" + e.getMessage());
-                            }
-                            throw new IllegalStateException(e1);
-                        }
-                    }
-                    AsterixClusterProperties.INSTANCE.setGlobalRecoveryCompleted(true);
-                    LOGGER.info("Global Recovery Completed");
-                }
-            });
-            state = newState;
-            recoveryThread.start();
-        }
-        return null;
-    }
-
-    private void executeHyracksJob(JobSpecification spec) throws Exception {
-        spec.setMaxReattempts(0);
-        JobId jobId = hcc.startJob(spec);
-        hcc.waitForCompletion(jobId);
-    }
-
-    @Override
-    public void notifyRequestCompletion(IClusterManagementWorkResponse response) {
-        // Do nothing
-    }
-
-    @Override
-    public void notifyStateChange(ClusterState previousState, ClusterState newState) {
-        // Do nothing?
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 2a7b3e4..f2fc9bf 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -79,7 +79,7 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
 
     @Override
     public void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception {
-        messageBroker = new CCMessageBroker((ClusterControllerService)ccAppCtx.getControllerService());
+        messageBroker = new CCMessageBroker((ClusterControllerService) ccAppCtx.getControllerService());
         this.appCtx = ccAppCtx;
 
         if (LOGGER.isLoggable(Level.INFO)) {
@@ -87,7 +87,11 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
         }
 
         appCtx.setThreadFactory(new AsterixThreadFactory(new LifeCycleComponentManager()));
-        AsterixAppContextInfo.initialize(appCtx, getNewHyracksClientConnection());
+        GlobalRecoveryManager.INSTANCE = new GlobalRecoveryManager(
+                (HyracksConnection) getNewHyracksClientConnection());
+
+        AsterixAppContextInfo.initialize(appCtx, getNewHyracksClientConnection(),
+                GlobalRecoveryManager.INSTANCE);
 
         proxy = AsterixStateProxy.registerRemoteObject();
         appCtx.setDistributedState(proxy);
@@ -111,9 +115,7 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
         centralFeedManager = CentralFeedManager.getInstance();
         centralFeedManager.start();
 
-        AsterixGlobalRecoveryManager.INSTANCE = new AsterixGlobalRecoveryManager(
-                (HyracksConnection) getNewHyracksClientConnection());
-        ClusterManager.INSTANCE.registerSubscriber(AsterixGlobalRecoveryManager.INSTANCE);
+        ClusterManager.INSTANCE.registerSubscriber(GlobalRecoveryManager.INSTANCE);
 
         AsterixReplicationProperties asterixRepliactionProperties = AsterixAppContextInfo.getInstance()
                 .getReplicationProperties();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
index 9505692..00b7391 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
@@ -95,19 +95,12 @@ public class ClusterLifecycleListener implements IClusterLifecycleListener {
             if (LOGGER.isLoggable(Level.INFO)) {
                 LOGGER.info("NC: " + deadNode + " left");
             }
-            AsterixClusterProperties.INSTANCE.removeNCConfiguration(deadNode);
-
             //if metadata node failed, we need to rebind the proxy connection when it joins again.
-            //Note: the format for the NC should be (INSTANCE-NAME)_(NC-ID)
-            if (AsterixClusterProperties.INSTANCE.getCluster() != null) {
-                String instanceName = AsterixClusterProperties.INSTANCE.getCluster().getInstanceName();
-                String metadataNodeName = AsterixClusterProperties.INSTANCE.getCluster().getMetadataNode();
-                String completeMetadataNodeName = instanceName + "_" + metadataNodeName;
-                if (deadNode.equals(completeMetadataNodeName)) {
-                    MetadataManager.INSTANCE.rebindMetadataNode = true;
-                }
+            String metadataNode = AsterixClusterProperties.INSTANCE.getCurrentMetadataNode();
+            if (deadNode.equals(metadataNode)) {
+                MetadataManager.INSTANCE.rebindMetadataNode = true;
             }
-
+            AsterixClusterProperties.INSTANCE.removeNCConfiguration(deadNode);
         }
         updateProgress(ClusterEventType.NODE_FAILURE, deadNodeIds);
         Set<IClusterEventsSubscriber> subscribers = ClusterManager.INSTANCE.getRegisteredClusterEventSubscribers();
@@ -166,7 +159,8 @@ public class ClusterLifecycleListener implements IClusterLifecycleListener {
                 case REMOVE_NODE:
                     nodesToRemove.addAll(((RemoveNodeWork) w).getNodesToBeRemoved());
                     nodeRemovalRequests.add(w);
-                    RemoveNodeWorkResponse response = new RemoveNodeWorkResponse((RemoveNodeWork) w, Status.IN_PROGRESS);
+                    RemoveNodeWorkResponse response = new RemoveNodeWorkResponse((RemoveNodeWork) w,
+                            Status.IN_PROGRESS);
                     pendingWorkResponses.add(response);
                     break;
             }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
new file mode 100644
index 0000000..2bac1cf
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.hyracks.bootstrap;
+
+import java.util.List;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IClusterManagementWork;
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.asterix.common.api.IClusterManagementWorkResponse;
+import org.apache.asterix.common.cluster.IGlobalRecoveryMaanger;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState;
+import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
+import org.apache.asterix.common.config.MetadataConstants;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.feed.CentralFeedManager;
+import org.apache.asterix.file.ExternalIndexingOperations;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.AqlMetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Dataverse;
+import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.hyracks.api.client.HyracksConnection;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class GlobalRecoveryManager implements IGlobalRecoveryMaanger {
+
+    private static ClusterState state;
+    private static final Logger LOGGER = Logger.getLogger(GlobalRecoveryManager.class.getName());
+    private HyracksConnection hcc;
+    public static GlobalRecoveryManager INSTANCE;
+
+    public GlobalRecoveryManager(HyracksConnection hcc) throws Exception {
+        state = ClusterState.UNUSABLE;
+        this.hcc = hcc;
+    }
+
+    @Override
+    public Set<IClusterManagementWork> notifyNodeFailure(Set<String> deadNodeIds) {
+        state = AsterixClusterProperties.INSTANCE.getState();
+        AsterixClusterProperties.INSTANCE.setGlobalRecoveryCompleted(false);
+        return null;
+    }
+
+    @Override
+    public Set<IClusterManagementWork> notifyNodeJoin(String joinedNodeId) {
+        startGlobalRecovery();
+        return null;
+    }
+
+    private void executeHyracksJob(JobSpecification spec) throws Exception {
+        spec.setMaxReattempts(0);
+        JobId jobId = hcc.startJob(spec);
+        hcc.waitForCompletion(jobId);
+    }
+
+    @Override
+    public void notifyRequestCompletion(IClusterManagementWorkResponse response) {
+        // Do nothing
+    }
+
+    @Override
+    public void notifyStateChange(ClusterState previousState, ClusterState newState) {
+        // Do nothing?
+    }
+
+    @Override
+    public void startGlobalRecovery() {
+        // perform global recovery if state changed to active
+        final ClusterState newState = AsterixClusterProperties.INSTANCE.getState();
+        boolean needToRecover = !newState.equals(state) && (newState == ClusterState.ACTIVE);
+        if (needToRecover) {
+            Thread recoveryThread = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    LOGGER.info("Starting AsterixDB's Global Recovery");
+                    MetadataTransactionContext mdTxnCtx = null;
+                    try {
+                        Thread.sleep(4000);
+                        MetadataManager.INSTANCE.init();
+                        // Loop over datasets
+                        mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                        List<Dataverse> dataverses = MetadataManager.INSTANCE.getDataverses(mdTxnCtx);
+                        for (Dataverse dataverse : dataverses) {
+                            if (!dataverse.getDataverseName().equals(MetadataConstants.METADATA_DATAVERSE_NAME)) {
+                                AqlMetadataProvider metadataProvider = new AqlMetadataProvider(dataverse,
+                                        CentralFeedManager.getInstance());
+                                List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx,
+                                        dataverse.getDataverseName());
+                                for (Dataset dataset : datasets) {
+                                    if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+                                        // External dataset
+                                        // Get indexes
+                                        List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx,
+                                                dataset.getDataverseName(), dataset.getDatasetName());
+                                        if (indexes.size() > 0) {
+                                            // Get the state of the dataset
+                                            ExternalDatasetDetails dsd = (ExternalDatasetDetails) dataset
+                                                    .getDatasetDetails();
+                                            ExternalDatasetTransactionState datasetState = dsd.getState();
+                                            if (datasetState == ExternalDatasetTransactionState.BEGIN) {
+                                                List<ExternalFile> files = MetadataManager.INSTANCE
+                                                        .getDatasetExternalFiles(mdTxnCtx, dataset);
+                                                // if persumed abort, roll backward
+                                                // 1. delete all pending files
+                                                for (ExternalFile file : files) {
+                                                    if (file.getPendingOp() != ExternalFilePendingOp.PENDING_NO_OP) {
+                                                        MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
+                                                    }
+                                                }
+                                                // 2. clean artifacts in NCs
+                                                metadataProvider.setMetadataTxnContext(mdTxnCtx);
+                                                JobSpecification jobSpec = ExternalIndexingOperations
+                                                        .buildAbortOp(dataset, indexes, metadataProvider);
+                                                executeHyracksJob(jobSpec);
+                                                // 3. correct the dataset state
+                                                ((ExternalDatasetDetails) dataset.getDatasetDetails())
+                                                        .setState(ExternalDatasetTransactionState.COMMIT);
+                                                MetadataManager.INSTANCE.updateDataset(mdTxnCtx, dataset);
+                                                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                                                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                                            } else if (datasetState == ExternalDatasetTransactionState.READY_TO_COMMIT) {
+                                                List<ExternalFile> files = MetadataManager.INSTANCE
+                                                        .getDatasetExternalFiles(mdTxnCtx, dataset);
+                                                // if ready to commit, roll forward
+                                                // 1. commit indexes in NCs
+                                                metadataProvider.setMetadataTxnContext(mdTxnCtx);
+                                                JobSpecification jobSpec = ExternalIndexingOperations
+                                                        .buildRecoverOp(dataset, indexes, metadataProvider);
+                                                executeHyracksJob(jobSpec);
+                                                // 2. add pending files in metadata
+                                                for (ExternalFile file : files) {
+                                                    if (file.getPendingOp() == ExternalFilePendingOp.PENDING_ADD_OP) {
+                                                        MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
+                                                        file.setPendingOp(ExternalFilePendingOp.PENDING_NO_OP);
+                                                        MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file);
+                                                    } else if (file
+                                                            .getPendingOp() == ExternalFilePendingOp.PENDING_DROP_OP) {
+                                                        // find original file
+                                                        for (ExternalFile originalFile : files) {
+                                                            if (originalFile.getFileName().equals(file.getFileName())) {
+                                                                MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
+                                                                        file);
+                                                                MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
+                                                                        originalFile);
+                                                                break;
+                                                            }
+                                                        }
+                                                    } else if (file
+                                                            .getPendingOp() == ExternalFilePendingOp.PENDING_APPEND_OP) {
+                                                        // find original file
+                                                        for (ExternalFile originalFile : files) {
+                                                            if (originalFile.getFileName().equals(file.getFileName())) {
+                                                                MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
+                                                                        file);
+                                                                MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
+                                                                        originalFile);
+                                                                originalFile.setSize(file.getSize());
+                                                                MetadataManager.INSTANCE.addExternalFile(mdTxnCtx,
+                                                                        originalFile);
+                                                            }
+                                                        }
+                                                    }
+                                                }
+                                                // 3. correct the dataset state
+                                                ((ExternalDatasetDetails) dataset.getDatasetDetails())
+                                                        .setState(ExternalDatasetTransactionState.COMMIT);
+                                                MetadataManager.INSTANCE.updateDataset(mdTxnCtx, dataset);
+                                                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                                                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                        }
+                        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                    } catch (Exception e) {
+                        // This needs to be fixed <-- Needs to shutdown the system -->
+                        /*
+                         * Note: Throwing this illegal state exception will terminate this thread
+                         * and feeds listeners will not be notified.
+                         */
+                        LOGGER.severe("Global recovery was not completed successfully" + e);
+                        try {
+                            MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+                        } catch (Exception e1) {
+                            if (LOGGER.isLoggable(Level.SEVERE)) {
+                                LOGGER.severe("Exception in aborting" + e.getMessage());
+                            }
+                            throw new IllegalStateException(e1);
+                        }
+                    }
+                    AsterixClusterProperties.INSTANCE.setGlobalRecoveryCompleted(true);
+                    LOGGER.info("Global Recovery Completed");
+                }
+            });
+            state = newState;
+            recoveryThread.start();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 3341387..dac9af5 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -20,7 +20,6 @@ package org.apache.asterix.hyracks.bootstrap;
 
 import java.io.File;
 import java.io.IOException;
-import java.rmi.server.UnicastRemoteObject;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -42,10 +41,6 @@ import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.event.schema.cluster.Cluster;
 import org.apache.asterix.event.schema.cluster.Node;
 import org.apache.asterix.messaging.NCMessageBroker;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.MetadataNode;
-import org.apache.asterix.metadata.api.IAsterixStateProxy;
-import org.apache.asterix.metadata.api.IMetadataNode;
 import org.apache.asterix.metadata.bootstrap.MetadataBootstrap;
 import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
@@ -102,7 +97,7 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
             LOGGER.info("Starting Asterix node controller: " + nodeId);
         }
 
-        runtimeContext = new AsterixAppRuntimeContext(ncApplicationContext);
+        runtimeContext = new AsterixAppRuntimeContext(ncApplicationContext, metadataRmiPort);
         AsterixMetadataProperties metadataProperties = ((IAsterixPropertiesProvider) runtimeContext)
                 .getMetadataProperties();
         if (!metadataProperties.getNodeNames().contains(ncApplicationContext.getNodeId())) {
@@ -215,33 +210,12 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
             localResourceRepository.initializeNewUniverse(AsterixClusterProperties.INSTANCE.getStorageDirectoryName());
         }
 
-        IAsterixStateProxy proxy = null;
         isMetadataNode = nodeId.equals(metadataProperties.getMetadataNodeName());
         if (isMetadataNode) {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Bootstrapping metadata");
-            }
-            MetadataNode.INSTANCE.initialize(runtimeContext);
-
-            proxy = (IAsterixStateProxy) ncApplicationContext.getDistributedState();
-            if (proxy == null) {
-                throw new IllegalStateException("Metadata node cannot access distributed state");
-            }
-
-            //This is a special case, we just give the metadataNode directly.
-            //This way we can delay the registration of the metadataNode until
-            //it is completely initialized.
-            MetadataManager.INSTANCE = new MetadataManager(proxy, MetadataNode.INSTANCE);
-            MetadataBootstrap.startUniverse(((IAsterixPropertiesProvider) runtimeContext), ncApplicationContext,
-                    systemState == SystemState.NEW_UNIVERSE);
-            MetadataBootstrap.startDDLRecovery();
-
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Metadata node bound");
-            }
+            runtimeContext.initializeMetadata(systemState == SystemState.NEW_UNIVERSE);
         }
-
         ExternalLibraryBootstrap.setUpExternaLibraries(isMetadataNode);
+
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Starting lifecycle components");
         }
@@ -267,9 +241,7 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
         recoveryMgr.checkpoint(true, RecoveryManager.NON_SHARP_CHECKPOINT_TARGET_LSN);
 
         if (isMetadataNode) {
-            IMetadataNode stub = null;
-            stub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE, metadataRmiPort);
-            proxy.setMetadataNode(stub);
+            runtimeContext.exportMetadataNodeStub();
         }
 
         //Clean any temporary files

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
index 095ef1b..aeaef59 100644
--- a/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
+++ b/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -29,14 +29,17 @@ import org.apache.asterix.common.messaging.ReportMaxResourceIdMessage;
 import org.apache.asterix.common.messaging.ReportMaxResourceIdRequestMessage;
 import org.apache.asterix.common.messaging.ResourceIdRequestMessage;
 import org.apache.asterix.common.messaging.ResourceIdRequestResponseMessage;
+import org.apache.asterix.common.messaging.TakeoverMetadataNodeResponseMessage;
+import org.apache.asterix.common.messaging.TakeoverPartitionsResponseMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
 import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.hyracks.api.messages.IMessage;
-import org.apache.hyracks.api.messages.IMessageBroker;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
 
-public class CCMessageBroker implements IMessageBroker {
+public class CCMessageBroker implements ICCMessageBroker {
 
     private final static Logger LOGGER = Logger.getLogger(CCMessageBroker.class.getName());
     private final AtomicLong globalResourceId = new AtomicLong(0);
@@ -58,6 +61,12 @@ public class CCMessageBroker implements IMessageBroker {
             case REPORT_MAX_RESOURCE_ID_RESPONSE:
                 handleReportResourceMaxIdResponse(message, nodeId);
                 break;
+            case TAKEOVER_PARTITIONS_RESPONSE:
+                handleTakeoverPartitionsResponse(message);
+                break;
+            case TAKEOVER_METADATA_NODE_RESPONSE:
+                handleTakeoverMetadataNodeResponse(message);
+                break;
             default:
                 LOGGER.warning("Unknown message: " + absMessage.getMessageType());
                 break;
@@ -89,7 +98,8 @@ public class CCMessageBroker implements IMessageBroker {
         nodesReportedMaxResourceId.add(nodeId);
     }
 
-    private void sendApplicationMessageToNC(IMessage msg, String nodeId) throws Exception {
+    @Override
+    public void sendApplicationMessageToNC(IApplicationMessage msg, String nodeId) throws Exception {
         Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
         NodeControllerState state = nodeMap.get(nodeId);
         state.getNodeController().sendApplicationMessageToNC(JavaSerializationUtils.serialize(msg), null, nodeId);
@@ -106,4 +116,14 @@ public class CCMessageBroker implements IMessageBroker {
             }
         }
     }
+
+    private void handleTakeoverPartitionsResponse(IMessage message) {
+        TakeoverPartitionsResponseMessage msg = (TakeoverPartitionsResponseMessage) message;
+        AsterixClusterProperties.INSTANCE.processPartitionTakeoverResponse(msg);
+    }
+
+    private void handleTakeoverMetadataNodeResponse(IMessage message) {
+        TakeoverMetadataNodeResponseMessage msg = (TakeoverMetadataNodeResponseMessage) message;
+        AsterixClusterProperties.INSTANCE.processMetadataNodeTakeoverResponse(msg);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index 001771e..8f8723e 100644
--- a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++ b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -25,9 +25,13 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.messaging.AbstractApplicationMessage;
 import org.apache.asterix.common.messaging.ReportMaxResourceIdMessage;
+import org.apache.asterix.common.messaging.TakeoverMetadataNodeResponseMessage;
+import org.apache.asterix.common.messaging.TakeoverPartitionsRequestMessage;
+import org.apache.asterix.common.messaging.TakeoverPartitionsResponseMessage;
 import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.asterix.common.messaging.api.IApplicationMessageCallback;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.replication.IRemoteRecoveryManager;
 import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties;
 import org.apache.hyracks.api.messages.IMessage;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
@@ -75,11 +79,40 @@ public class NCMessageBroker implements INCMessageBroker {
             case REPORT_MAX_RESOURCE_ID_REQUEST:
                 reportMaxResourceId();
                 break;
+            case TAKEOVER_PARTITIONS_REQUEST:
+                handleTakeoverPartitons(message);
+                break;
+            case TAKEOVER_METADATA_NODE_REQUEST:
+                handleTakeoverMetadataNode(message);
+                break;
             default:
                 break;
         }
     }
 
+    private void handleTakeoverPartitons(IMessage message) throws Exception {
+        TakeoverPartitionsRequestMessage msg = (TakeoverPartitionsRequestMessage) message;
+        IAsterixAppRuntimeContext appContext = (IAsterixAppRuntimeContext) ncs.getApplicationContext()
+                .getApplicationObject();
+        IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager();
+        remoteRecoeryManager.takeoverPartitons(msg.getFailedNode(), msg.getPartitions());
+        //send response after takeover is completed
+        TakeoverPartitionsResponseMessage reponse = new TakeoverPartitionsResponseMessage(msg.getRequestId(),
+                appContext.getTransactionSubsystem().getId(), msg.getPartitions());
+        sendMessage(reponse, null);
+    }
+
+    private void handleTakeoverMetadataNode(IMessage message) throws Exception {
+        IAsterixAppRuntimeContext appContext = (IAsterixAppRuntimeContext) ncs.getApplicationContext()
+                .getApplicationObject();
+        appContext.initializeMetadata(false);
+        appContext.exportMetadataNodeStub();
+        //send response after takeover is completed
+        TakeoverMetadataNodeResponseMessage reponse = new TakeoverMetadataNodeResponseMessage(
+                appContext.getTransactionSubsystem().getId());
+        sendMessage(reponse, null);
+    }
+
     @Override
     public void reportMaxResourceId() throws Exception {
         IAsterixAppRuntimeContext appContext = (IAsterixAppRuntimeContext) ncs.getApplicationContext()

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
index 3386252..975180b 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.common.api;
 
 import java.io.IOException;
+import java.rmi.RemoteException;
 import java.util.List;
 import java.util.concurrent.Executor;
 
@@ -89,4 +90,17 @@ public interface IAsterixAppRuntimeContext {
     public IReplicationChannel getReplicationChannel();
 
     public void initializeResourceIdFactory() throws HyracksDataException;
+
+    /**
+     * Exports the metadata node to the metadata RMI port.
+     * @throws RemoteException
+     */
+    public void exportMetadataNodeStub() throws RemoteException;
+
+    /**
+     * Initializes the metadata node and bootstraps the metadata.
+     * @param newUniverse
+     * @throws Exception
+     */
+    public void initializeMetadata(boolean newUniverse) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalRecoveryMaanger.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalRecoveryMaanger.java b/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalRecoveryMaanger.java
new file mode 100644
index 0000000..48b1e73
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalRecoveryMaanger.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.cluster;
+
+import org.apache.asterix.common.api.IClusterEventsSubscriber;
+
+public interface IGlobalRecoveryMaanger extends IClusterEventsSubscriber {
+
+    /**
+     * Starts the global recovery process if the cluster state changed to ACTIVE.
+     */
+    public void startGlobalRecovery();
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
index 8e2c4e7..473a163 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
@@ -65,4 +65,8 @@ public class AsterixMetadataProperties extends AbstractAsterixProperties {
     public SortedMap<Integer, ClusterPartition> getClusterPartitions() {
         return accessor.getClusterPartitions();
     }
+
+    public Map<String, String> getTransactionLogDirs() {
+        return accessor.getTransactionLogDirs();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
index 1ef7e3e..fa5b503 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
@@ -36,7 +36,6 @@ public class AsterixReplicationProperties extends AbstractAsterixProperties {
     private static int REPLICATION_TIME_OUT_DEFAULT = 15;
 
     private static final String NODE_IP_ADDRESS_DEFAULT = "127.0.0.1";
-    private static final String REPLICATION_STORE_DEFAULT = "asterix-replication";
     private final String NODE_NAME_PREFIX;
     private final Cluster cluster;
 
@@ -102,8 +101,8 @@ public class AsterixReplicationProperties extends AbstractAsterixProperties {
             }
 
             if (nodeIndex == -1) {
-                LOGGER.log(Level.WARNING, "Could not find node " + getRealCluserNodeID(nodeId)
-                        + " in cluster configurations");
+                LOGGER.log(Level.WARNING,
+                        "Could not find node " + getRealCluserNodeID(nodeId) + " in cluster configurations");
                 return null;
             }
 
@@ -179,13 +178,6 @@ public class AsterixReplicationProperties extends AbstractAsterixProperties {
         return replicaIds;
     }
 
-    public String getReplicationStore() {
-        if (cluster != null) {
-            return cluster.getDataReplication().getReplicationStore();
-        }
-        return REPLICATION_STORE_DEFAULT;
-    }
-
     public int getReplicationFactor() {
         if (cluster != null) {
             if (cluster.getDataReplication() == null || cluster.getDataReplication().getReplicationFactor() == null) {
@@ -202,5 +194,4 @@ public class AsterixReplicationProperties extends AbstractAsterixProperties {
         }
         return REPLICATION_TIME_OUT_DEFAULT;
     }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IAsterixApplicationContextInfo.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IAsterixApplicationContextInfo.java b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IAsterixApplicationContextInfo.java
index 8dc3efe..a5cd72b 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IAsterixApplicationContextInfo.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IAsterixApplicationContextInfo.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.common.dataflow;
 
+import org.apache.asterix.common.cluster.IGlobalRecoveryMaanger;
 import org.apache.hyracks.api.application.ICCApplicationContext;
 import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
 import org.apache.hyracks.storage.common.IStorageManagerInterface;
@@ -48,4 +49,6 @@ public interface IAsterixApplicationContextInfo {
      * @return ICCApplicationContext implementation instance
      */
     public ICCApplicationContext getCCApplicationContext();
+
+    public IGlobalRecoveryMaanger getGlobalRecoveryManager();
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeRequestMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeRequestMessage.java
new file mode 100644
index 0000000..78b86a8
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeRequestMessage.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public class TakeoverMetadataNodeRequestMessage extends AbstractApplicationMessage {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public ApplicationMessageType getMessageType() {
+        return ApplicationMessageType.TAKEOVER_METADATA_NODE_REQUEST;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeResponseMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeResponseMessage.java
new file mode 100644
index 0000000..78f7429
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeResponseMessage.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public class TakeoverMetadataNodeResponseMessage extends AbstractApplicationMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final String nodeId;
+
+    public TakeoverMetadataNodeResponseMessage(String nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    @Override
+    public ApplicationMessageType getMessageType() {
+        return ApplicationMessageType.TAKEOVER_METADATA_NODE_RESPONSE;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java
new file mode 100644
index 0000000..abfa7d2
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public class TakeoverPartitionsRequestMessage extends AbstractApplicationMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final Integer[] partitions;
+    private final String failedNode;
+    private final long requestId;
+    private final String nodeId;
+
+    public TakeoverPartitionsRequestMessage(long requestId, String nodeId, String failedNode,
+            Integer[] partitionsToTakeover) {
+        this.requestId = requestId;
+        this.nodeId = nodeId;
+        this.failedNode = failedNode;
+        this.partitions = partitionsToTakeover;
+    }
+
+    @Override
+    public ApplicationMessageType getMessageType() {
+        return ApplicationMessageType.TAKEOVER_PARTITIONS_REQUEST;
+    }
+
+    public Integer[] getPartitions() {
+        return partitions;
+    }
+
+    public long getRequestId() {
+        return requestId;
+    }
+
+    public String getFailedNode() {
+        return failedNode;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Request ID: " + requestId);
+        sb.append(" Node ID: " + nodeId);
+        sb.append(" Failed Node: " + failedNode);
+        sb.append(" Partitions: ");
+        for (Integer partitionId : partitions) {
+            sb.append(partitionId + ",");
+        }
+        //remove last comma
+        sb.charAt(sb.length() - 1);
+        return sb.toString();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsResponseMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsResponseMessage.java
new file mode 100644
index 0000000..86eb3cb
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsResponseMessage.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public class TakeoverPartitionsResponseMessage extends AbstractApplicationMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final Integer[] partitions;
+    private final String nodeId;
+    private final long requestId;
+
+    public TakeoverPartitionsResponseMessage(long requestId, String nodeId, Integer[] partitionsToTakeover) {
+        this.requestId = requestId;
+        this.nodeId = nodeId;
+        this.partitions = partitionsToTakeover;
+    }
+
+    @Override
+    public ApplicationMessageType getMessageType() {
+        return ApplicationMessageType.TAKEOVER_PARTITIONS_RESPONSE;
+    }
+
+    public Integer[] getPartitions() {
+        return partitions;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public long getRequestId() {
+        return requestId;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
index 61ab7cd..57a0dae 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
@@ -26,7 +26,11 @@ public interface IApplicationMessage extends IMessage {
         RESOURCE_ID_REQUEST,
         RESOURCE_ID_RESPONSE,
         REPORT_MAX_RESOURCE_ID_REQUEST,
-        REPORT_MAX_RESOURCE_ID_RESPONSE
+        REPORT_MAX_RESOURCE_ID_RESPONSE,
+        TAKEOVER_PARTITIONS_REQUEST,
+        TAKEOVER_PARTITIONS_RESPONSE,
+        TAKEOVER_METADATA_NODE_REQUEST,
+        TAKEOVER_METADATA_NODE_RESPONSE
     }
 
     public abstract ApplicationMessageType getMessageType();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
new file mode 100644
index 0000000..7dafbd5
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging.api;
+
+import org.apache.hyracks.api.messages.IMessageBroker;
+
+public interface ICCMessageBroker extends IMessageBroker {
+
+    /**
+     * Sends the passed message to the specified {@code nodeId}
+     * @param msg
+     * @param nodeId
+     * @throws Exception
+     */
+    public void sendApplicationMessageToNC(IApplicationMessage msg, String nodeId) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
index 63d29a0..ecc9494 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
@@ -18,8 +18,24 @@
  */
 package org.apache.asterix.common.replication;
 
+import java.io.IOException;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+
 public interface IRemoteRecoveryManager {
 
+    /**
+     * Attempts to perform the remote recovery process from an active remote replica.
+     */
     public void performRemoteRecovery();
 
+    /**
+     * Performs the partitions takeover process from the {@code failedNode}
+     * @param failedNode
+     * @param partitions
+     * @throws IOException
+     * @throws ACIDException
+     */
+    public void takeoverPartitons(String failedNode, Integer[] partitions) throws IOException, ACIDException;
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
index c796f37..f13d300 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
@@ -22,10 +22,15 @@ import java.util.Set;
 
 public interface IReplicaResourcesManager {
 
-    public String getIndexPath(String backupNodeId, int IODeviceNum, String dataverse, String dataset);
-
-    public String getLocalStorageFolder();
-
+    /**
+     * @param remoteNodes
+     * @return The minimum LSN of all indexes that belong to {@code remoteNodes}.
+     */
     public long getMinRemoteLSN(Set<String> remoteNodes);
 
+    /**
+     * @param partitions
+     * @return the minimum LSN of all indexes that belong to {@code partitions}.
+     */
+    public long getPartitionsMinLSN(Integer[] partitions);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
index 9ea9957..a2b7a82 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
@@ -111,4 +111,14 @@ public interface IRecoveryManager {
      * @throws HyracksDataException
      */
     public long getLocalMinFirstLSN() throws HyracksDataException;
+
+    /**
+     * Replay the logs that belong to the passed {@code partitions} starting from the {@code lowWaterMarkLSN}
+     * @param partitions
+     * @param lowWaterMarkLSN
+     * @param failedNode
+     * @throws IOException
+     * @throws ACIDException
+     */
+    public void replayPartitionsLogs(Integer[] partitions, long lowWaterMarkLSN, String failedNode) throws IOException, ACIDException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index acfb9d5..48e42bd 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -56,6 +56,14 @@ public class StoragePathUtil {
     }
 
     public static String prepareDataverseIndexName(String dataverseName, String datasetName, String idxName) {
-        return dataverseName + File.separator + datasetName + StoragePathUtil.DATASET_INDEX_NAME_SEPARATOR + idxName;
+        return prepareDataverseIndexName(dataverseName, prepareFullIndexName(datasetName, idxName));
+    }
+
+    public static String prepareDataverseIndexName(String dataverseName, String fullIndexName) {
+        return dataverseName + File.separator + fullIndexName;
+    }
+
+    private static String prepareFullIndexName(String datasetName, String idxName) {
+        return (datasetName + DATASET_INDEX_NAME_SEPARATOR + idxName);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/resources/schema/cluster.xsd
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/resources/schema/cluster.xsd b/asterix-common/src/main/resources/schema/cluster.xsd
index 872c959..e0605f0 100644
--- a/asterix-common/src/main/resources/schema/cluster.xsd
+++ b/asterix-common/src/main/resources/schema/cluster.xsd
@@ -47,7 +47,7 @@
 	<xs:element name="enabled" type="xs:boolean" />
 	<xs:element name="replication_port" type="xs:integer" />
 	<xs:element name="replication_factor" type="xs:integer" />
-	<xs:element name="replication_store" type="xs:string" />
+	<xs:element name="auto_failover" type="xs:boolean" />
 	<xs:element name="replication_time_out" type="xs:integer" />
 
 	<!-- definition of complex elements -->
@@ -82,7 +82,7 @@
 				<xs:element ref="cl:enabled" />
 				<xs:element ref="cl:replication_port" />
 				<xs:element ref="cl:replication_factor" />
-				<xs:element ref="cl:replication_store" />
+				<xs:element ref="cl:auto_failover" />
 				<xs:element ref="cl:replication_time_out" />
 			</xs:sequence>
 		</xs:complexType>


Mime
View raw message