asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mhub...@apache.org
Subject [6/6] incubator-asterixdb git commit: Introducing Data Replication To AsterixDB
Date Thu, 12 Nov 2015 20:51:33 GMT
Introducing Data Replication To AsterixDB

This change includes the following:
- Add data replication properties to cluster properties and Managix validate command.
- Introduce Data Replication components.
- Add data replication required fields to LogRecord.
- Specialized LogManager for data replication.
- Fix for invalid cluster state on nodes failure.
- ASTERIXDB-139: Fix for cleaning workspace files on startup/shutdown.
- Fix for temp datasets storage reclamation.
- Allow MetadataNode rebinding with CC.
- Add flag to checkpoint to identify sharp checkpoints.
- ASTERIXDB-1170: Fix shutdown sequence

Change-Id: I729fdd1144dbc9ff039b4bc414494860d7553810
Reviewed-on: https://asterix-gerrit.ics.uci.edu/338
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Young-Seok Kim <kisskys@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/commit/209f3907
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/tree/209f3907
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/diff/209f3907

Branch: refs/heads/master
Commit: 209f39075cc2e61e14406d215d19de33fb37af81
Parents: 7c7e856
Author: Murtadha Hubail <mhubail@uci.edu>
Authored: Wed Nov 11 22:11:28 2015 -0800
Committer: Murtadha Hubail <hubailmor@gmail.com>
Committed: Thu Nov 12 12:47:49 2015 -0800

----------------------------------------------------------------------
 .../operators/physical/CommitRuntime.java       |    1 +
 asterix-app/pom.xml                             |   23 +-
 .../api/common/AsterixAppRuntimeContext.java    |  108 +-
 .../bootstrap/CCApplicationEntryPoint.java      |   20 +-
 .../bootstrap/ClusterLifecycleListener.java     |   13 +
 .../bootstrap/NCApplicationEntryPoint.java      |   96 +-
 asterix-common/pom.xml                          |   16 +-
 .../common/api/IAsterixAppRuntimeContext.java   |   14 +
 .../common/api/IDatasetLifecycleManager.java    |   16 +
 .../config/AsterixReplicationProperties.java    |  206 +++
 .../config/IAsterixPropertiesProvider.java      |    4 +-
 .../common/context/BaseOperationTracker.java    |    6 +-
 .../common/context/DatasetLifecycleManager.java |   63 +-
 .../context/PrimaryIndexOperationTracker.java   |   11 +-
 .../common/dataflow/AsterixLSMIndexUtil.java    |    6 +
 .../AbstractLSMIOOperationCallback.java         |    3 +
 .../LSMBTreeIOOperationCallback.java            |   10 +
 .../LSMBTreeWithBuddyIOOperationCallback.java   |   11 +
 .../LSMInvertedIndexIOOperationCallback.java    |   10 +
 .../LSMRTreeIOOperationCallback.java            |   11 +
 .../replication/AsterixReplicationJob.java      |   37 +
 .../replication/IRemoteRecoveryManager.java     |   25 +
 .../replication/IReplicaResourcesManager.java   |   34 +
 .../common/replication/IReplicationChannel.java |   37 +
 .../common/replication/IReplicationManager.java |  150 +++
 .../common/replication/IReplicationThread.java  |   32 +
 .../asterix/common/replication/Replica.java     |   92 ++
 .../common/replication/ReplicaEvent.java        |   76 ++
 .../asterix/common/transactions/ILogBuffer.java |    2 +
 .../common/transactions/ILogManager.java        |   45 +-
 .../asterix/common/transactions/ILogRecord.java |   34 +-
 .../common/transactions/IRecoveryManager.java   |   21 +
 .../asterix/common/transactions/LogRecord.java  |  313 ++++-
 .../asterix/common/transactions/LogSource.java  |   46 +
 .../src/main/resources/schema/cluster.xsd       |   19 +-
 .../asterix/event/util/PatternCreator.java      |   28 +
 .../installer/command/ValidateCommand.java      |   75 ++
 asterix-lang-aql/pom.xml                        |    4 +-
 asterix-lang-sqlpp/pom.xml                      |    4 +-
 .../asterix/metadata/MetadataManager.java       |   15 +-
 .../metadata/declared/AqlMetadataProvider.java  |    4 +-
 .../asterix/om/util/AsterixAppContextInfo.java  |    9 +
 .../om/util/AsterixClusterProperties.java       |    8 +-
 asterix-replication/pom.xml                     |   70 +
 .../functions/AsterixReplicationProtocol.java   |  346 +++++
 .../functions/ReplicaFilesRequest.java          |   60 +
 .../functions/ReplicaIndexFlushRequest.java     |   60 +
 .../functions/ReplicaLogsRequest.java           |   71 +
 .../replication/logging/RemoteLogMapping.java   |   72 +
 .../logging/ReplicationLogBuffer.java           |  158 +++
 .../logging/ReplicationLogFlusher.java          |  104 ++
 .../replication/management/NetworkingUtil.java  |  110 ++
 .../management/ReplicaEventNotifier.java        |  109 ++
 .../management/ReplicaStateChecker.java         |   86 ++
 .../management/ReplicationChannel.java          |  640 +++++++++
 .../ReplicationLifecycleListener.java           |   77 ++
 .../management/ReplicationManager.java          | 1247 ++++++++++++++++++
 .../recovery/RemoteRecoveryManager.java         |  208 +++
 .../replication/storage/AsterixFilesUtil.java   |   78 ++
 .../storage/AsterixLSMIndexFileProperties.java  |  189 +++
 .../storage/LSMComponentLSNSyncTask.java        |   45 +
 .../storage/LSMComponentProperties.java         |  203 +++
 .../storage/ReplicaResourcesManager.java        |  415 ++++++
 ...tractIndexModificationOperationCallback.java |    1 +
 .../PersistentLocalResourceRepository.java      |   52 +-
 .../service/locking/ConcurrentLockManager.java  |  159 ++-
 .../management/service/logging/LogBuffer.java   |   93 +-
 .../management/service/logging/LogManager.java  |   49 +-
 .../logging/LogManagerWithReplication.java      |  131 ++
 .../service/recovery/CheckpointObject.java      |   10 +-
 .../service/recovery/RecoveryManager.java       |  415 +++++-
 .../service/transaction/TransactionContext.java |    1 +
 .../transaction/TransactionSubsystem.java       |   18 +-
 pom.xml                                         |    1 +
 74 files changed, 6725 insertions(+), 311 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/209f3907/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
index dfec32a..6f3078e 100644
--- a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
+++ b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
@@ -72,6 +72,7 @@ public class CommitRuntime implements IPushRuntime {
         this.isWriteTransaction = isWriteTransaction;
         this.longHashes = new long[2];
         this.logRecord = new LogRecord();
+        logRecord.setNodeId(logMgr.getNodeId());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/209f3907/asterix-app/pom.xml
----------------------------------------------------------------------
diff --git a/asterix-app/pom.xml b/asterix-app/pom.xml
index df9a0e5..66382e6 100644
--- a/asterix-app/pom.xml
+++ b/asterix-app/pom.xml
@@ -25,15 +25,14 @@
     </parent>
     <artifactId>asterix-app</artifactId>
 
-    <licenses>
-        <license>
-            <name>Apache License, Version 2.0</name>
-            <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
-            <distribution>repo</distribution>
-            <comments>A business-friendly OSS license</comments>
-        </license>
-    </licenses>
-
+	<licenses>
+		<license>
+			<name>Apache License, Version 2.0</name>
+			<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+			<distribution>repo</distribution>
+			<comments>A business-friendly OSS license</comments>
+		</license>
+	</licenses>
     <build>
         <plugins>
             <plugin>
@@ -216,5 +215,11 @@
             <version>1.2.2</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.asterix</groupId>
+            <artifactId>asterix-replication</artifactId>
+            <version>0.8.8-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/209f3907/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
index 2e7c23f..15252e9 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
@@ -25,12 +25,13 @@ import java.util.logging.Logger;
 import org.apache.asterix.common.api.AsterixThreadExecutor;
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.config.AsterixBuildProperties;
 import org.apache.asterix.common.config.AsterixCompilerProperties;
 import org.apache.asterix.common.config.AsterixExternalProperties;
 import org.apache.asterix.common.config.AsterixFeedProperties;
-import org.apache.asterix.common.config.AsterixBuildProperties;
 import org.apache.asterix.common.config.AsterixMetadataProperties;
 import org.apache.asterix.common.config.AsterixPropertiesAccessor;
+import org.apache.asterix.common.config.AsterixReplicationProperties;
 import org.apache.asterix.common.config.AsterixStorageProperties;
 import org.apache.asterix.common.config.AsterixTransactionProperties;
 import org.apache.asterix.common.config.IAsterixPropertiesProvider;
@@ -39,10 +40,20 @@ import org.apache.asterix.common.context.DatasetLifecycleManager;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.feeds.api.IFeedManager;
+import org.apache.asterix.common.replication.IRemoteRecoveryManager;
+import org.apache.asterix.common.replication.IReplicaResourcesManager;
+import org.apache.asterix.common.replication.IReplicationChannel;
+import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.feeds.FeedManager;
 import org.apache.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
+import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.replication.management.ReplicationChannel;
+import org.apache.asterix.replication.management.ReplicationManager;
+import org.apache.asterix.replication.recovery.RemoteRecoveryManager;
+import org.apache.asterix.replication.storage.ReplicaResourcesManager;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepositoryFactory;
 import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
 import org.apache.hyracks.api.application.INCApplicationContext;
@@ -95,6 +106,7 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
     private AsterixTransactionProperties txnProperties;
     private AsterixFeedProperties feedProperties;
     private AsterixBuildProperties buildProperties;
+    private AsterixReplicationProperties replicationProperties;
 
     private AsterixThreadExecutor threadExecutor;
     private IDatasetLifecycleManager datasetLifecycleManager;
@@ -110,6 +122,11 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
 
     private IFeedManager feedManager;
 
+    private IReplicationChannel replicationChannel;
+    private IReplicationManager replicationManager;
+    private IRemoteRecoveryManager remoteRecoveryManager;
+    private IReplicaResourcesManager replicaResourcesManager;
+
     public AsterixAppRuntimeContext(INCApplicationContext ncApplicationContext) {
         this.ncApplicationContext = ncApplicationContext;
         compilerProperties = new AsterixCompilerProperties(ASTERIX_PROPERTIES_ACCESSOR);
@@ -119,6 +136,8 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
         txnProperties = new AsterixTransactionProperties(ASTERIX_PROPERTIES_ACCESSOR);
         feedProperties = new AsterixFeedProperties(ASTERIX_PROPERTIES_ACCESSOR);
         buildProperties = new AsterixBuildProperties(ASTERIX_PROPERTIES_ACCESSOR);
+        replicationProperties = new AsterixReplicationProperties(ASTERIX_PROPERTIES_ACCESSOR,
+                AsterixClusterProperties.INSTANCE.getCluster());
     }
 
     public void initialize() throws IOException, ACIDException, AsterixException {
@@ -131,8 +150,6 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
         ioManager = ncApplicationContext.getRootContext().getIOManager();
         IPageReplacementStrategy prs = new ClockPageReplacementStrategy(allocator,
                 storageProperties.getBufferCachePageSize(), storageProperties.getBufferCacheNumPages());
-        bufferCache = new BufferCache(ioManager, prs, pcp, fileMapManager,
-                storageProperties.getBufferCacheMaxOpenFiles(), ncApplicationContext.getThreadFactory());
 
         AsynchronousScheduler.INSTANCE.init(ncApplicationContext.getThreadFactory());
         lsmIOScheduler = AsynchronousScheduler.INSTANCE;
@@ -142,7 +159,7 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
         ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory = new PersistentLocalResourceRepositoryFactory(
                 ioManager, ncApplicationContext.getNodeId());
         localResourceRepository = persistentLocalResourceRepositoryFactory.createRepository();
-        resourceIdFactory = (new ResourceIdFactoryProvider(localResourceRepository)).createResourceIdFactory();
+        initializeResourceIdFactory();
 
         IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider = new AsterixAppRuntimeContextProdiverForRecovery(
                 this);
@@ -157,14 +174,61 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
         feedManager = new FeedManager(ncApplicationContext.getNodeId(), feedProperties,
                 compilerProperties.getFrameSize());
 
+        if (replicationProperties.isReplicationEnabled()) {
+            String nodeId = ncApplicationContext.getNodeId();
+
+            replicaResourcesManager = new ReplicaResourcesManager(ioManager.getIODevices(),
+                    metadataProperties.getStores().get(nodeId)[0], nodeId, replicationProperties.getReplicationStore());
+
+            replicationManager = new ReplicationManager(nodeId, replicationProperties, replicaResourcesManager,
+                    txnSubsystem.getLogManager(), asterixAppRuntimeContextProvider);
+
+            //pass replication manager to replication required object
+            //LogManager to replicate logs
+            txnSubsystem.getLogManager().setReplicationManager(replicationManager);
+
+            //PersistentLocalResourceRepository to replicate metadata files and delete backups on drop index
+            ((PersistentLocalResourceRepository) localResourceRepository).setReplicationManager(replicationManager);
+
+            //initialize replication channel
+            replicationChannel = new ReplicationChannel(nodeId, replicationProperties, txnSubsystem.getLogManager(),
+                    replicaResourcesManager, replicationManager, ncApplicationContext,
+                    asterixAppRuntimeContextProvider);
+
+            remoteRecoveryManager = new RemoteRecoveryManager(replicationManager, this, replicationProperties);
+
+            bufferCache = new BufferCache(ioManager, prs, pcp, fileMapManager,
+                    storageProperties.getBufferCacheMaxOpenFiles(), ncApplicationContext.getThreadFactory(),
+                    replicationManager);
+
+        } else {
+            bufferCache = new BufferCache(ioManager, prs, pcp, fileMapManager,
+                    storageProperties.getBufferCacheMaxOpenFiles(), ncApplicationContext.getThreadFactory());
+        }
+
         // The order of registration is important. The buffer cache must registered before recovery and transaction managers.
+        //Notes: registered components are stopped in reversed order
         ILifeCycleComponentManager lccm = ncApplicationContext.getLifeCycleComponentManager();
         lccm.register((ILifeCycleComponent) bufferCache);
-        lccm.register((ILifeCycleComponent) txnSubsystem.getTransactionManager());
+        /**
+         * LogManager must be stopped after RecoveryManager, DatasetLifeCycleManager, and ReplicationManager
+         * to process any logs that might be generated during stopping these components
+         */
         lccm.register((ILifeCycleComponent) txnSubsystem.getLogManager());
+        lccm.register((ILifeCycleComponent) txnSubsystem.getRecoveryManager());
+        /**
+         * ReplicationManager must be stopped after indexLifecycleManager so that any logs/files generated
+         * during closing datasets are sent to remote replicas
+         */
+        if (replicationManager != null) {
+            lccm.register(replicationManager);
+        }
+        /**
+         * Stopping indexLifecycleManager will flush and close all datasets.
+         */
         lccm.register((ILifeCycleComponent) datasetLifecycleManager);
+        lccm.register((ILifeCycleComponent) txnSubsystem.getTransactionManager());
         lccm.register((ILifeCycleComponent) txnSubsystem.getLockManager());
-        lccm.register((ILifeCycleComponent) txnSubsystem.getRecoveryManager());
     }
 
     public boolean isShuttingdown() {
@@ -276,4 +340,34 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
     public IFeedManager getFeedManager() {
         return feedManager;
     }
-}
+
+    @Override
+    public AsterixReplicationProperties getReplicationProperties() {
+        return replicationProperties;
+    }
+
+    @Override
+    public IReplicationChannel getReplicationChannel() {
+        return replicationChannel;
+    }
+
+    @Override
+    public IReplicaResourcesManager getReplicaResourcesManager() {
+        return replicaResourcesManager;
+    }
+
+    @Override
+    public IRemoteRecoveryManager getRemoteRecoveryManager() {
+        return remoteRecoveryManager;
+    }
+
+    @Override
+    public IReplicationManager getReplicationManager() {
+        return replicationManager;
+    }
+
+    @Override
+    public void initializeResourceIdFactory() throws HyracksDataException {
+        resourceIdFactory = (new ResourceIdFactoryProvider(localResourceRepository)).createResourceIdFactory();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/209f3907/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 5aae42a..90697ab 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -21,9 +21,6 @@ package org.apache.asterix.hyracks.bootstrap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.servlet.ServletContextHandler;
-import org.eclipse.jetty.servlet.ServletHolder;
 import org.apache.asterix.api.http.servlet.APIServlet;
 import org.apache.asterix.api.http.servlet.AQLAPIServlet;
 import org.apache.asterix.api.http.servlet.ConnectorAPIServlet;
@@ -35,14 +32,14 @@ import org.apache.asterix.api.http.servlet.QueryStatusAPIServlet;
 import org.apache.asterix.api.http.servlet.ShutdownAPIServlet;
 import org.apache.asterix.api.http.servlet.UpdateAPIServlet;
 import org.apache.asterix.api.http.servlet.VersionAPIServlet;
-import org.apache.asterix.common.config.AsterixBuildProperties;
 import org.apache.asterix.common.api.AsterixThreadFactory;
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.asterix.common.config.AsterixBuildProperties;
 import org.apache.asterix.common.config.AsterixExternalProperties;
 import org.apache.asterix.common.config.AsterixMetadataProperties;
+import org.apache.asterix.common.config.AsterixReplicationProperties;
 import org.apache.asterix.common.feeds.api.ICentralFeedManager;
 import org.apache.asterix.event.service.ILookupService;
-import org.apache.asterix.feeds.CentralFeedManager;
 import org.apache.asterix.feeds.FeedLifecycleListener;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.api.IAsterixStateProxy;
@@ -50,11 +47,15 @@ import org.apache.asterix.metadata.bootstrap.AsterixStateProxy;
 import org.apache.asterix.metadata.cluster.ClusterManager;
 import org.apache.asterix.om.util.AsterixAppContextInfo;
 import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.replication.management.ReplicationLifecycleListener;
 import org.apache.hyracks.api.application.ICCApplicationContext;
 import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
 import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
 
 public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
 
@@ -101,13 +102,18 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
         feedServer.start();
 
         ExternalLibraryBootstrap.setUpExternaLibraries(false);
-        centralFeedManager = CentralFeedManager.getInstance();
-        centralFeedManager.start();
 
         AsterixGlobalRecoveryManager.INSTANCE = new AsterixGlobalRecoveryManager(
                 (HyracksConnection) getNewHyracksClientConnection());
         ClusterManager.INSTANCE.registerSubscriber(AsterixGlobalRecoveryManager.INSTANCE);
 
+        AsterixReplicationProperties asterixRepliactionProperties = AsterixAppContextInfo.getInstance()
+                .getReplicationProperties();
+        if (asterixRepliactionProperties.isReplicationEnabled()) {
+            ReplicationLifecycleListener.INSTANCE = new ReplicationLifecycleListener(asterixRepliactionProperties);
+            ClusterManager.INSTANCE.registerSubscriber(ReplicationLifecycleListener.INSTANCE);
+        }
+
         ccAppCtx.addClusterLifecycleListener(ClusterLifecycleListener.INSTANCE);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/209f3907/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
index e76d43b..9505692 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
@@ -33,6 +33,7 @@ import org.apache.asterix.common.api.IClusterManagementWorkResponse;
 import org.apache.asterix.common.api.IClusterManagementWorkResponse.Status;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.event.schema.cluster.Node;
+import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.cluster.AddNodeWork;
 import org.apache.asterix.metadata.cluster.AddNodeWorkResponse;
 import org.apache.asterix.metadata.cluster.ClusterManager;
@@ -95,6 +96,18 @@ public class ClusterLifecycleListener implements IClusterLifecycleListener {
                 LOGGER.info("NC: " + deadNode + " left");
             }
             AsterixClusterProperties.INSTANCE.removeNCConfiguration(deadNode);
+
+            //if metadata node failed, we need to rebind the proxy connection when it joins again.
+            //Note: the format for the NC should be (INSTANCE-NAME)_(NC-ID)
+            if (AsterixClusterProperties.INSTANCE.getCluster() != null) {
+                String instanceName = AsterixClusterProperties.INSTANCE.getCluster().getInstanceName();
+                String metadataNodeName = AsterixClusterProperties.INSTANCE.getCluster().getMetadataNode();
+                String completeMetadataNodeName = instanceName + "_" + metadataNodeName;
+                if (deadNode.equals(completeMetadataNodeName)) {
+                    MetadataManager.INSTANCE.rebindMetadataNode = true;
+                }
+            }
+
         }
         updateProgress(ClusterEventType.NODE_FAILURE, deadNodeIds);
         Set<IClusterEventsSubscriber> subscribers = ClusterManager.INSTANCE.getRegisteredClusterEventSubscribers();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/209f3907/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 63f862c..a95d747 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -19,22 +19,22 @@
 package org.apache.asterix.hyracks.bootstrap;
 
 import java.io.File;
+import java.io.IOException;
 import java.rmi.server.UnicastRemoteObject;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.kohsuke.args4j.CmdLineException;
-import org.kohsuke.args4j.CmdLineParser;
-import org.kohsuke.args4j.Option;
-
 import org.apache.asterix.api.common.AsterixAppRuntimeContext;
 import org.apache.asterix.common.api.AsterixThreadFactory;
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.config.AsterixMetadataProperties;
+import org.apache.asterix.common.config.AsterixReplicationProperties;
 import org.apache.asterix.common.config.AsterixTransactionProperties;
 import org.apache.asterix.common.config.IAsterixPropertiesProvider;
+import org.apache.asterix.common.context.DatasetLifecycleManager;
+import org.apache.asterix.common.replication.IRemoteRecoveryManager;
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
 import org.apache.asterix.event.schema.cluster.Cluster;
@@ -44,12 +44,18 @@ import org.apache.asterix.metadata.MetadataNode;
 import org.apache.asterix.metadata.api.IAsterixStateProxy;
 import org.apache.asterix.metadata.api.IMetadataNode;
 import org.apache.asterix.metadata.bootstrap.MetadataBootstrap;
+import org.apache.asterix.metadata.declared.AqlMetadataProvider;
 import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.replication.storage.AsterixFilesUtil;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.asterix.transaction.management.service.recovery.RecoveryManager;
 import org.apache.hyracks.api.application.INCApplicationContext;
 import org.apache.hyracks.api.application.INCApplicationEntryPoint;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
 import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
+import org.kohsuke.args4j.CmdLineException;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
 
 public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
     private static final Logger LOGGER = Logger.getLogger(NCApplicationEntryPoint.class.getName());
@@ -66,7 +72,8 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
     private boolean isMetadataNode = false;
     private boolean stopInitiated = false;
     private SystemState systemState = SystemState.NEW_UNIVERSE;
-    private final long NON_SHARP_CHECKPOINT_TARGET_LSN = -1;
+    private boolean performedRemoteRecovery = false;
+    private boolean replicationEnabled = false;
 
     @Override
     public void start(INCApplicationContext ncAppCtx, String[] args) throws Exception {
@@ -100,6 +107,12 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
         runtimeContext.initialize();
         ncApplicationContext.setApplicationObject(runtimeContext);
 
+        //if replication is enabled, check if there is a replica for this node
+        AsterixReplicationProperties asterixReplicationProperties = ((IAsterixPropertiesProvider) runtimeContext)
+                .getReplicationProperties();
+
+        replicationEnabled = asterixReplicationProperties.isReplicationEnabled();
+
         if (initialRun) {
             LOGGER.info("System is being initialized. (first run)");
             systemState = SystemState.NEW_UNIVERSE;
@@ -112,11 +125,42 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
                 LOGGER.info("System is in a state: " + systemState);
             }
 
+            if (replicationEnabled) {
+                if (systemState == SystemState.NEW_UNIVERSE || systemState == SystemState.CORRUPTED) {
+                    //try to perform remote recovery
+                    IRemoteRecoveryManager remoteRecoveryMgr = runtimeContext.getRemoteRecoveryManager();
+                    remoteRecoveryMgr.performRemoteRecovery();
+                    performedRemoteRecovery = true;
+                    systemState = SystemState.HEALTHY;
+                }
+            }
+
             if (systemState == SystemState.CORRUPTED) {
                 recoveryMgr.startRecovery(true);
             }
         }
 
+        if (replicationEnabled) {
+            startReplicationService();
+        }
+    }
+
+    private void startReplicationService() throws IOException {
+        //open replication channel
+        runtimeContext.getReplicationChannel().start();
+
+        //check the state of remote replicas
+        runtimeContext.getReplicationManager().initializeReplicasState();
+
+        if (performedRemoteRecovery) {
+            //notify remote replicas about the new IP Address if changed
+            //Note: this is a hack since each node right now maintains its own copy of the cluster configuration.
+            //Once the configuration is centralized on the CC, this step wont be needed.
+            runtimeContext.getReplicationManager().broadcastNewIPAddress();
+        }
+
+        //start replication after the state of remote replicas has been initialized. 
+        runtimeContext.getReplicationManager().startReplicationThreads();
     }
 
     @Override
@@ -128,13 +172,14 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
                 LOGGER.info("Stopping Asterix node controller: " + nodeId);
             }
 
-            IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
-            recoveryMgr.checkpoint(true, NON_SHARP_CHECKPOINT_TARGET_LSN);
-
             if (isMetadataNode) {
                 MetadataBootstrap.stopUniverse();
             }
 
+            //clean any temporary files
+            performLocalCleanUp();
+
+            //Note: stopping recovery manager will make a sharp checkpoint
             ncApplicationContext.getLifeCycleComponentManager().stopAll(false);
             runtimeContext.deinitialize();
         } else {
@@ -211,7 +256,7 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
         lccm.startAll();
 
         IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
-        recoveryMgr.checkpoint(true, NON_SHARP_CHECKPOINT_TARGET_LSN);
+        recoveryMgr.checkpoint(true, RecoveryManager.NON_SHARP_CHECKPOINT_TARGET_LSN);
 
         if (isMetadataNode) {
             IMetadataNode stub = null;
@@ -219,20 +264,31 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
             proxy.setMetadataNode(stub);
         }
 
-        // Reclaim storage for temporary datasets.
-        String[] ioDevices = AsterixClusterProperties.INSTANCE.getIODevices(nodeId);
-        String[] nodeStores = metadataProperties.getStores().get(nodeId);
-        int numIoDevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(nodeId);
-        for (int j = 0; j < nodeStores.length; j++) {
-            for (int k = 0; k < numIoDevices; k++) {
-                File f = new File(ioDevices[k] + File.separator + nodeStores[j] + File.separator + "temp");
-                f.delete();
-            }
+        //clean any temporary files
+        performLocalCleanUp();
+    }
+
+    private void performLocalCleanUp() throws IOException {
+        //delete working area files from failed jobs
+        runtimeContext.getIOManager().deleteWorkspaceFiles();
+
+        //reclaim storage for temporary datasets.
+        PersistentLocalResourceRepository localResourceRepository = (PersistentLocalResourceRepository) runtimeContext
+                .getLocalResourceRepository();
+
+        String[] storageMountingPoints = localResourceRepository.getStorageMountingPoints();
+        String storageFolderName = ((IAsterixPropertiesProvider) runtimeContext).getMetadataProperties().getStores()
+                .get(nodeId)[0];
+
+        for (String mountPoint : storageMountingPoints) {
+            String tempDatasetFolder = mountPoint + storageFolderName + File.separator
+                    + AqlMetadataProvider.TEMP_DATASETS_STORAGE_FOLDER;
+            AsterixFilesUtil.deleteFolder(tempDatasetFolder);
         }
 
         // TODO
-        // reclaim storage for orphaned index artifacts in NCs.
-
+        //reclaim storage for orphaned index artifacts in NCs.
+        //Note: currently LSM indexes invalid components are deleted when an index is activated.
     }
 
     private void updateOnNodeJoin() {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/209f3907/asterix-common/pom.xml
----------------------------------------------------------------------
diff --git a/asterix-common/pom.xml b/asterix-common/pom.xml
index 57f9f66..512502b 100644
--- a/asterix-common/pom.xml
+++ b/asterix-common/pom.xml
@@ -25,14 +25,14 @@
 	</parent>
 	<artifactId>asterix-common</artifactId>
 
-  <licenses>
-    <license>
-      <name>Apache License, Version 2.0</name>
-      <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
-      <distribution>repo</distribution>
-      <comments>A business-friendly OSS license</comments>
-    </license>
-  </licenses>
+	<licenses>
+		<license>
+			<name>Apache License, Version 2.0</name>
+			<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+			<distribution>repo</distribution>
+			<comments>A business-friendly OSS license</comments>
+		</license>
+	</licenses>
 
 	<build>
 		<plugins>

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/209f3907/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
index f86ed8a..63851bf 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
@@ -25,6 +25,10 @@ import java.util.concurrent.Executor;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.feeds.api.IFeedManager;
+import org.apache.asterix.common.replication.IRemoteRecoveryManager;
+import org.apache.asterix.common.replication.IReplicaResourcesManager;
+import org.apache.asterix.common.replication.IReplicationChannel;
+import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IIOManager;
@@ -76,4 +80,14 @@ public interface IAsterixAppRuntimeContext {
     public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID);
 
     public IFeedManager getFeedManager();
+
+    public IRemoteRecoveryManager getRemoteRecoveryManager();
+
+    public IReplicaResourcesManager getReplicaResourcesManager();
+
+    public IReplicationManager getReplicationManager();
+
+    public IReplicationChannel getReplicationChannel();
+
+    public void initializeResourceIdFactory() throws HyracksDataException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/209f3907/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index e1e6d96..803e708 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -21,6 +21,7 @@ package org.apache.asterix.common.api;
 import java.util.List;
 
 import org.apache.asterix.common.context.DatasetLifecycleManager.DatasetInfo;
+import org.apache.asterix.common.context.DatasetLifecycleManager.IndexInfo;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.api.IIndex;
 import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
@@ -38,12 +39,14 @@ public interface IDatasetLifecycleManager extends IIndexLifecycleManager {
 
     /**
      * Flushes all open datasets synchronously.
+     * 
      * @throws HyracksDataException
      */
     void flushAllDatasets() throws HyracksDataException;
 
     /**
      * Schedules asynchronous flush on datasets that have memory components with first LSN < nonSharpCheckpointTargetLSN.
+     * 
      * @param nonSharpCheckpointTargetLSN
      * @throws HyracksDataException
      */
@@ -51,6 +54,7 @@ public interface IDatasetLifecycleManager extends IIndexLifecycleManager {
 
     /**
      * creates (if necessary) and returns the dataset info.
+     * 
      * @param datasetID
      * @return
      */
@@ -67,6 +71,7 @@ public interface IDatasetLifecycleManager extends IIndexLifecycleManager {
 
     /**
      * creates (if necessary) and returns the primary index operation tracker of a dataset.
+     * 
      * @param datasetID
      * @return
      */
@@ -74,8 +79,19 @@ public interface IDatasetLifecycleManager extends IIndexLifecycleManager {
 
     /**
      * creates (if necessary) and returns the dataset virtual buffer caches.
+     * 
      * @param datasetID
      * @return
      */
     List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID);
+
+    /**
+     * Flushes then closes all open datasets
+     */
+    void closeAllDatasets() throws HyracksDataException;
+
+    /**
+     * @return a list of all indexes that are open at the time of the call.
+     */
+    List<IndexInfo> getOpenIndexesInfo();
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/209f3907/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
new file mode 100644
index 0000000..1ef7e3e
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.config;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.replication.Replica;
+import org.apache.asterix.event.schema.cluster.Cluster;
+import org.apache.asterix.event.schema.cluster.Node;
+
+public class AsterixReplicationProperties extends AbstractAsterixProperties {
+
+    private static final Logger LOGGER = Logger.getLogger(AsterixReplicationProperties.class.getName());
+
+    private static int REPLICATION_DATAPORT_DEFAULT = 2000;
+    private static int REPLICATION_FACTOR_DEFAULT = 1;
+    private static int REPLICATION_TIME_OUT_DEFAULT = 15;
+
+    private static final String NODE_IP_ADDRESS_DEFAULT = "127.0.0.1";
+    private static final String REPLICATION_STORE_DEFAULT = "asterix-replication";
+    private final String NODE_NAME_PREFIX;
+    private final Cluster cluster;
+
+    public AsterixReplicationProperties(AsterixPropertiesAccessor accessor, Cluster cluster) {
+        super(accessor);
+        this.cluster = cluster;
+
+        if (cluster != null) {
+            NODE_NAME_PREFIX = cluster.getInstanceName() + "_";
+        } else {
+            NODE_NAME_PREFIX = "";
+        }
+    }
+
+    public boolean isReplicationEnabled() {
+        if (cluster != null && cluster.getDataReplication() != null) {
+            if (getReplicationFactor() == 1) {
+                return false;
+            }
+
+            return cluster.getDataReplication().isEnabled();
+
+        } else {
+            return false;
+        }
+    }
+
+    public String getReplicaIPAddress(String nodeId) {
+        if (cluster != null) {
+
+            for (int i = 0; i < cluster.getNode().size(); i++) {
+                Node node = cluster.getNode().get(i);
+                if (getRealCluserNodeID(node.getId()).equals(nodeId)) {
+                    return node.getClusterIp();
+                }
+            }
+        }
+        return NODE_IP_ADDRESS_DEFAULT;
+    }
+
+    public int getDataReplicationPort(String nodeId) {
+        if (cluster != null) {
+            return cluster.getDataReplication().getReplicationPort().intValue();
+        }
+
+        return REPLICATION_DATAPORT_DEFAULT;
+    }
+
+    public Set<Replica> getRemoteReplicas(String nodeId) {
+        Set<Replica> remoteReplicas = new HashSet<Replica>();;
+
+        int numberOfRemoteReplicas = getReplicationFactor() - 1;
+
+        //Using chained-declustering
+        if (cluster != null) {
+            int nodeIndex = -1;
+            for (int i = 0; i < cluster.getNode().size(); i++) {
+                Node node = cluster.getNode().get(i);
+                if (getRealCluserNodeID(node.getId()).equals(nodeId)) {
+                    nodeIndex = i;
+                    break;
+                }
+            }
+
+            if (nodeIndex == -1) {
+                LOGGER.log(Level.WARNING, "Could not find node " + getRealCluserNodeID(nodeId)
+                        + " in cluster configurations");
+                return null;
+            }
+
+            for (int i = nodeIndex + 1; i < cluster.getNode().size(); i++) {
+                remoteReplicas.add(getReplicaByNodeIndex(i));
+
+                if (remoteReplicas.size() == numberOfRemoteReplicas) {
+                    break;
+                }
+            }
+
+            if (remoteReplicas.size() != numberOfRemoteReplicas) {
+                for (int i = 0; i < cluster.getNode().size(); i++) {
+
+                    remoteReplicas.add(getReplicaByNodeIndex(i));
+
+                    if (remoteReplicas.size() == numberOfRemoteReplicas) {
+                        break;
+                    }
+                }
+            }
+        }
+        return remoteReplicas;
+    }
+
+    private Replica getReplicaByNodeIndex(int nodeIndex) {
+        Node node = cluster.getNode().get(nodeIndex);
+        Node replicaNode = new Node();
+        replicaNode.setId(getRealCluserNodeID(node.getId()));
+        replicaNode.setClusterIp(node.getClusterIp());
+        return new Replica(replicaNode);
+    }
+
+    public Replica getReplicaById(String nodeId) {
+        int nodeIndex = -1;
+        if (cluster != null) {
+            for (int i = 0; i < cluster.getNode().size(); i++) {
+                Node node = cluster.getNode().get(i);
+
+                if (getRealCluserNodeID(node.getId()).equals(nodeId)) {
+                    nodeIndex = i;
+                    break;
+                }
+            }
+        }
+
+        if (nodeIndex < 0) {
+            return null;
+        }
+
+        return getReplicaByNodeIndex(nodeIndex);
+    }
+
+    public Set<String> getRemoteReplicasIds(String nodeId) {
+        Set<String> remoteReplicasIds = new HashSet<String>();
+        Set<Replica> remoteReplicas = getRemoteReplicas(nodeId);
+
+        for (Replica replica : remoteReplicas) {
+            remoteReplicasIds.add(replica.getId());
+        }
+
+        return remoteReplicasIds;
+    }
+
+    public String getRealCluserNodeID(String nodeId) {
+        return NODE_NAME_PREFIX + nodeId;
+    }
+
+    public Set<String> getNodeReplicasIds(String nodeId) {
+        Set<String> replicaIds = new HashSet<String>();
+        replicaIds.add(nodeId);
+        replicaIds.addAll(getRemoteReplicasIds(nodeId));
+        return replicaIds;
+    }
+
+    public String getReplicationStore() {
+        if (cluster != null) {
+            return cluster.getDataReplication().getReplicationStore();
+        }
+        return REPLICATION_STORE_DEFAULT;
+    }
+
+    public int getReplicationFactor() {
+        if (cluster != null) {
+            if (cluster.getDataReplication() == null || cluster.getDataReplication().getReplicationFactor() == null) {
+                return REPLICATION_FACTOR_DEFAULT;
+            }
+            return cluster.getDataReplication().getReplicationFactor().intValue();
+        }
+        return REPLICATION_FACTOR_DEFAULT;
+    }
+
+    public int getReplicationTimeOut() {
+        if (cluster != null) {
+            return cluster.getDataReplication().getReplicationTimeOut().intValue();
+        }
+        return REPLICATION_TIME_OUT_DEFAULT;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/209f3907/asterix-common/src/main/java/org/apache/asterix/common/config/IAsterixPropertiesProvider.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/IAsterixPropertiesProvider.java b/asterix-common/src/main/java/org/apache/asterix/common/config/IAsterixPropertiesProvider.java
index 93c58be..e6f383f 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/IAsterixPropertiesProvider.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/IAsterixPropertiesProvider.java
@@ -28,8 +28,10 @@ public interface IAsterixPropertiesProvider {
     public AsterixMetadataProperties getMetadataProperties();
 
     public AsterixExternalProperties getExternalProperties();
-    
+
     public AsterixFeedProperties getFeedProperties();
 
     AsterixBuildProperties getBuildProperties();
+
+    public AsterixReplicationProperties getReplicationProperties();
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/209f3907/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java b/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
index 776549a..21500b7 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
@@ -39,7 +39,8 @@ public class BaseOperationTracker implements ILSMOperationTracker {
     @Override
     public void beforeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
             IModificationOperationCallback modificationCallback) throws HyracksDataException {
-        if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
+        if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE
+                || opType == LSMOperationType.REPLICATE) {
             dsInfo.declareActiveIOOperation();
         }
     }
@@ -47,7 +48,8 @@ public class BaseOperationTracker implements ILSMOperationTracker {
     @Override
     public void afterOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
             IModificationOperationCallback modificationCallback) throws HyracksDataException {
-        if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
+        if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE
+                || opType == LSMOperationType.REPLICATE) {
             dsInfo.undeclareActiveIOOperation();
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/209f3907/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 20b07fa..adf1152 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -74,6 +74,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
         capacity = storageProperties.getMemoryComponentGlobalBudget();
         used = 0;
         logRecord = new LogRecord();
+        logRecord.setNodeId(logManager.getNodeId());
     }
 
     @Override
@@ -112,10 +113,10 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
         if (dsInfo.indexes.containsKey(resourceID)) {
             throw new HyracksDataException("Index with resource ID " + resourceID + " already exists.");
         }
-        dsInfo.indexes.put(resourceID, new IndexInfo((ILSMIndex) index));
+        dsInfo.indexes.put(resourceID, new IndexInfo((ILSMIndex) index, dsInfo.datasetID, resourceID));
     }
 
-    private int getDIDfromResourceName(String resourceName) throws HyracksDataException {
+    public int getDIDfromResourceName(String resourceName) throws HyracksDataException {
         LocalResource lr = resourceRepository.getResourceByName(resourceName);
         if (lr == null) {
             return -1;
@@ -123,7 +124,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
         return ((ILocalResourceMetadata) lr.getResourceObject()).getDatasetID();
     }
 
-    private long getResourceIDfromResourceName(String resourceName) throws HyracksDataException {
+    public long getResourceIDfromResourceName(String resourceName) throws HyracksDataException {
         LocalResource lr = resourceRepository.getResourceByName(resourceName);
         if (lr == null) {
             return -1;
@@ -279,15 +280,25 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
 
     @Override
     public synchronized List<IIndex> getOpenIndexes() {
+        List<IndexInfo> openIndexesInfo = getOpenIndexesInfo();
         List<IIndex> openIndexes = new ArrayList<IIndex>();
+        for (IndexInfo iInfo : openIndexesInfo) {
+            openIndexes.add(iInfo.index);
+        }
+        return openIndexes;
+    }
+
+    @Override
+    public synchronized List<IndexInfo> getOpenIndexesInfo() {
+        List<IndexInfo> openIndexesInfo = new ArrayList<IndexInfo>();
         for (DatasetInfo dsInfo : datasetInfos.values()) {
             for (IndexInfo iInfo : dsInfo.indexes.values()) {
                 if (iInfo.isOpen) {
-                    openIndexes.add(iInfo.index);
+                    openIndexesInfo.add(iInfo);
                 }
             }
         }
-        return openIndexes;
+        return openIndexesInfo;
     }
 
     public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID) {
@@ -358,11 +369,27 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
         }
     }
 
-    private class IndexInfo extends Info {
+    public class IndexInfo extends Info {
         private final ILSMIndex index;
+        private final long resourceId;
+        private final int datasetId;
 
-        public IndexInfo(ILSMIndex index) {
+        public IndexInfo(ILSMIndex index, int datasetId, long resourceId) {
             this.index = index;
+            this.datasetId = datasetId;
+            this.resourceId = resourceId;
+        }
+
+        public ILSMIndex getIndex() {
+            return index;
+        }
+
+        public long getResourceId() {
+            return resourceId;
+        }
+
+        public int getDatasetId() {
+            return datasetId;
         }
     }
 
@@ -456,14 +483,6 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
                     + ", lastAccess: " + lastAccess + ", isRegistered: " + isRegistered + ", memoryAllocated: "
                     + memoryAllocated;
         }
-
-        public boolean isMemoryAllocated() {
-            return memoryAllocated;
-        }
-
-        public int getDatasetID() {
-            return datasetID;
-        }
     }
 
     @Override
@@ -520,7 +539,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
     private void flushDatasetOpenIndexes(DatasetInfo dsInfo, boolean asyncFlush) throws HyracksDataException {
         if (!dsInfo.isExternal) {
             synchronized (logRecord) {
-                logRecord.formFlushLogRecord(dsInfo.datasetID, null);
+                logRecord.formFlushLogRecord(dsInfo.datasetID, null, dsInfo.indexes.size());
                 try {
                     logManager.log(logRecord);
                 } catch (ACIDException e) {
@@ -588,15 +607,19 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
         removeDatasetFromCache(dsInfo.datasetID);
     }
 
+    public void closeAllDatasets() throws HyracksDataException {
+        for (DatasetInfo dsInfo : datasetInfos.values()) {
+            closeDataset(dsInfo);
+        }
+    }
+
     @Override
     public synchronized void stop(boolean dumpState, OutputStream outputStream) throws IOException {
         if (dumpState) {
             dumpState(outputStream);
         }
 
-        for (DatasetInfo dsInfo : datasetInfos.values()) {
-            closeDataset(dsInfo);
-        }
+        closeAllDatasets();
 
         datasetVirtualBufferCaches.clear();
         datasetOpTrackers.clear();
@@ -686,4 +709,4 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
         int did = Integer.parseInt(resourceName);
         allocateDatasetMemory(did);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/209f3907/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index b4a3ac9..437fac4 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -59,7 +59,8 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker {
             IModificationOperationCallback modificationCallback) throws HyracksDataException {
         if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
             incrementNumActiveOperations(modificationCallback);
-        } else if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
+        } else if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE
+                || opType == LSMOperationType.REPLICATE) {
             dsInfo.declareActiveIOOperation();
         }
     }
@@ -68,7 +69,8 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker {
     public void afterOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
             IModificationOperationCallback modificationCallback) throws HyracksDataException {
         // Searches are immediately considered complete, because they should not prevent the execution of flushes.
-        if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
+        if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE
+                || opType == LSMOperationType.REPLICATE) {
             completeOperation(index, opType, searchCallback, modificationCallback);
         }
     }
@@ -84,7 +86,8 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker {
             } else if (numActiveOperations.get() < 0) {
                 throw new HyracksDataException("The number of active operations cannot be negative!");
             }
-        } else if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
+        } else if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE
+                || opType == LSMOperationType.REPLICATE) {
             dsInfo.undeclareActiveIOOperation();
         }
     }
@@ -119,7 +122,7 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker {
             }
 
             LogRecord logRecord = new LogRecord();
-            logRecord.formFlushLogRecord(datasetID, this);
+            logRecord.formFlushLogRecord(datasetID, this, logManager.getNodeId(), dsInfo.getDatasetIndexes().size());
 
             try {
                 logManager.log(logRecord);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/209f3907/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMIndexUtil.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMIndexUtil.java b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMIndexUtil.java
index 2cd0554..4259a10 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMIndexUtil.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMIndexUtil.java
@@ -41,4 +41,10 @@ public class AsterixLSMIndexUtil {
             }
         }
     }
+
+    public static boolean lsmComponentFileHasLSN(AbstractLSMIndex lsmIndex, String componentFilePath) {
+        AbstractLSMIOOperationCallback ioOpCallback = (AbstractLSMIOOperationCallback) lsmIndex
+                .getIOOperationCallback();
+        return ioOpCallback.componentFileHasLSN(componentFilePath);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/209f3907/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
index abf7ba9..76a11d1 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
@@ -151,4 +151,7 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC
         }
         return false;
     }
+    
+    public abstract boolean componentFileHasLSN(String componentFilePath);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/209f3907/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
index 12680fd..8b4fa01 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.btree.impls.BTree;
 import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeDiskComponent;
+import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeFileManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 
@@ -60,4 +61,13 @@ public class LSMBTreeIOOperationCallback extends AbstractLSMIOOperationCallback
         }
         return maxLSN;
     }
+
+    @Override
+    public boolean componentFileHasLSN(String componentFilePath) {
+        if (componentFilePath.endsWith(LSMBTreeFileManager.BTREE_STRING)) {
+            return true;
+        }
+
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/209f3907/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
index b5ce879..229ccd6 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeWithBuddyDiskComponent;
+import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeWithBuddyFileManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 
@@ -55,4 +56,14 @@ public class LSMBTreeWithBuddyIOOperationCallback extends AbstractLSMIOOperation
         return maxLSN;
     }
 
+    @Override
+    public boolean componentFileHasLSN(String componentFilePath) {
+        if (componentFilePath.endsWith(LSMBTreeWithBuddyFileManager.BTREE_STRING)
+                || componentFilePath.endsWith(LSMBTreeWithBuddyFileManager.BUDDY_BTREE_STRING)) {
+            return true;
+        }
+
+        return false;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/209f3907/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
index fd3cf12..3e4ff04 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
@@ -25,6 +25,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 import org.apache.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexDiskComponent;
+import org.apache.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexFileManager;
 
 public class LSMInvertedIndexIOOperationCallback extends AbstractLSMIOOperationCallback {
 
@@ -58,4 +59,13 @@ public class LSMInvertedIndexIOOperationCallback extends AbstractLSMIOOperationC
         }
         return maxLSN;
     }
+
+    @Override
+    public boolean componentFileHasLSN(String componentFilePath) {
+        if (componentFilePath.endsWith(LSMInvertedIndexFileManager.DELETED_KEYS_BTREE_SUFFIX)) {
+            return true;
+        }
+
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/209f3907/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
index 5d243a3..7c483f3 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
@@ -25,6 +25,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 import org.apache.hyracks.storage.am.lsm.rtree.impls.LSMRTreeDiskComponent;
+import org.apache.hyracks.storage.am.lsm.rtree.impls.LSMRTreeFileManager;
 
 public class LSMRTreeIOOperationCallback extends AbstractLSMIOOperationCallback {
 
@@ -59,4 +60,14 @@ public class LSMRTreeIOOperationCallback extends AbstractLSMIOOperationCallback
         }
         return maxLSN;
     }
+
+    @Override
+    public boolean componentFileHasLSN(String componentFilePath) {
+        if (componentFilePath.endsWith(LSMRTreeFileManager.RTREE_STRING)
+                || componentFilePath.endsWith(LSMRTreeFileManager.BTREE_STRING)) {
+            return true;
+        }
+
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/209f3907/asterix-common/src/main/java/org/apache/asterix/common/replication/AsterixReplicationJob.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/AsterixReplicationJob.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/AsterixReplicationJob.java
new file mode 100644
index 0000000..fc4f1ab
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/AsterixReplicationJob.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.util.Set;
+
+import org.apache.hyracks.api.replication.impl.AbstractReplicationJob;
+
+/**
+ * LSMIndexReplicationJob is used for LSM Components only in Hyracks level.
+ * AsterixReplicationJob is used for everything else.
+ * Currently it is used to transfer indexes metadata files.
+ */
+public class AsterixReplicationJob extends AbstractReplicationJob {
+
+    public AsterixReplicationJob(ReplicationJobType jobType, ReplicationOperation operation,
+            ReplicationExecutionType executionType, Set<String> filesToReplicate) {
+        super(jobType, operation, executionType, filesToReplicate);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/209f3907/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
new file mode 100644
index 0000000..63d29a0
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+public interface IRemoteRecoveryManager {
+
+    public void performRemoteRecovery();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/209f3907/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
new file mode 100644
index 0000000..f9481a0
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.io.IOException;
+import java.util.Set;
+
+public interface IReplicaResourcesManager {
+
+    public String getIndexPath(String backupNodeId, int IODeviceNum, String dataverse, String dataset);
+
+    public String getLocalStorageFolder();
+
+    public long getMinRemoteLSN(Set<String> remoteNodes);
+
+    public void deleteAsterixStorageData() throws IOException;
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/209f3907/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationChannel.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationChannel.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationChannel.java
new file mode 100644
index 0000000..56ae20f
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationChannel.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.io.IOException;
+
+public interface IReplicationChannel {
+
+    /**
+     * Opens the replication channel and starts accepting replication requests.
+     */
+    public void start();
+
+    /**
+     * Closes the replication channel.
+     * 
+     * @throws IOException
+     */
+    public void close() throws IOException;
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/209f3907/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
new file mode 100644
index 0000000..276d498
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Set;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.ILogRecord;
+import org.apache.hyracks.api.replication.IIOReplicationManager;
+
+public interface IReplicationManager extends IIOReplicationManager {
+
+    /**
+     * Asynchronously sends a serialized version of the record to remote replicas.
+     * 
+     * @param logRecord
+     *            The log record to be replicated,
+     */
+    public void replicateLog(ILogRecord logRecord);
+
+    /**
+     * Checks whether a log record has been replicated
+     * 
+     * @param logRecord
+     *            the log to check for.
+     * @return true, if all ACKs were received from remote replicas.
+     */
+    public boolean hasBeenReplicated(ILogRecord logRecord);
+
+    /**
+     * Requests txns logs from a remote replica.
+     * 
+     * @param remoteReplicaId
+     *            The replica id to send the request to.
+     * @param replicasDataToRecover
+     *            Get logs that belong to those replicas.
+     * @param fromLSN
+     *            Low water mark for logs to be requested.
+     * @return The logs received that belong to the local node.
+     * @throws IOException
+     * @throws ACIDException
+     */
+    public ArrayList<ILogRecord> requestReplicaLogs(String remoteReplicaId, Set<String> replicasDataToRecover,
+            long fromLSN) throws IOException, ACIDException;
+
+    /**
+     * Requests LSM components files from a remote replica.
+     * 
+     * @param remoteReplicaId
+     *            The replica id to send the request to.
+     * @param replicasDataToRecover
+     *            Get files that belong to those replicas.
+     * @throws IOException
+     */
+    public void requestReplicaFiles(String remoteReplicaId, Set<String> replicasDataToRecover) throws IOException;
+
+    /**
+     * Requests current maximum LSN from remote replicas.
+     * 
+     * @param remoteReplicaIds
+     *            remote replicas to send the request to.
+     * @return The maximum of the received maximum LSNs.
+     * @throws IOException
+     */
+    public long getMaxRemoteLSN(Set<String> remoteReplicaIds) throws IOException;
+
+    /**
+     * Sends the IP address of the local replica to all remote replicas.
+     * 
+     * @throws IOException
+     */
+    public void broadcastNewIPAddress() throws IOException;
+
+    /**
+     * @return The number of remote replicas that are in ACTIVE state.
+     */
+    public int getActiveReplicasCount();
+
+    /**
+     * @return The IDs of the remote replicas that are in DEAD state.
+     */
+    public Set<String> getDeadReplicasIds();
+
+    /**
+     * Starts processing of ASYNC replication jobs as well as Txn logs.
+     */
+    public void startReplicationThreads();
+
+    /**
+     * Checks and sets each remote replica state.
+     */
+    public void initializeReplicasState();
+
+    /**
+     * Updates remote replica (in-memory) information.
+     * 
+     * @param replica
+     *            the replica to update.
+     */
+    public void updateReplicaInfo(Replica replica);
+
+    /**
+     * @return The IDs of the remote replicas that are in ACTIVE state.
+     */
+    public Set<String> getActiveReplicasIds();
+
+    /**
+     * Submits a ReplicaEvent to ReplicationEventsMonitor thread.
+     * 
+     * @param event
+     */
+    public void reportReplicaEvent(ReplicaEvent event);
+
+    /**
+     * Requests the current minimum LSN of a remote replica.
+     * 
+     * @param replicaId
+     *            The replica to send the request to.
+     * @return The returned minimum LSN from the remote replica.
+     * @throws IOException
+     */
+    public long requestReplicaMinLSN(String replicaId) throws IOException;
+
+    /**
+     * Sends a request to remote replicas to flush indexes that have LSN less than nonSharpCheckpointTargetLSN
+     * 
+     * @param nonSharpCheckpointTargetLSN
+     * @throws IOException
+     */
+    public void requestFlushLaggingReplicaIndexes(long nonSharpCheckpointTargetLSN) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/209f3907/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationThread.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationThread.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationThread.java
new file mode 100644
index 0000000..3e2569d
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationThread.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import org.apache.asterix.common.transactions.LogRecord;
+
+public interface IReplicationThread extends Runnable {
+
+    /**
+     * Sends a notification to this thread that logRecord has been flushed.
+     * 
+     * @param logRecord
+     *            The log that has been flushed.
+     */
+    public void notifyLogReplicationRequester(LogRecord logRecord);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/209f3907/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
new file mode 100644
index 0000000..4c3f728
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+
+import org.apache.asterix.common.config.AsterixReplicationProperties;
+import org.apache.asterix.event.schema.cluster.Node;
+
+public class Replica {
+
+    public enum ReplicaState {
+        ACTIVE,
+        DEAD,
+        UNKNOWN
+    }
+
+    final Node node;
+    private ReplicaState state = ReplicaState.UNKNOWN;
+
+    public Replica(Node node) {
+        this.node = node;
+    }
+
+    public ReplicaState getState() {
+        return state;
+    }
+
+    public void setState(ReplicaState state) {
+        this.state = state;
+    }
+
+    public Node getNode() {
+        return node;
+    }
+
+    public String getId() {
+        return node.getId();
+    }
+
+    public InetSocketAddress getAddress(AsterixReplicationProperties asterixReplicationProperties) {
+        String replicaIPAddress = node.getClusterIp();
+        int replicationPort = asterixReplicationProperties.getDataReplicationPort(node.getId());
+        InetSocketAddress replicaAddress = InetSocketAddress.createUnresolved(replicaIPAddress, replicationPort);
+        return replicaAddress;
+    }
+
+    public static Replica create(DataInput input) throws IOException {
+        Node node = new Node();
+        Replica replica = new Replica(node);
+        replica.readFields(input);
+        return replica;
+    }
+
+    public void writeFields(DataOutput output) throws IOException {
+        output.writeUTF(node.getId());
+        output.writeUTF(node.getClusterIp());
+        output.writeInt(state.ordinal());
+    }
+
+    public void readFields(DataInput input) throws IOException {
+        this.node.setId(input.readUTF());
+        this.node.setClusterIp(input.readUTF());
+        this.state = ReplicaState.values()[input.readInt()];
+    }
+
+    public void serialize(OutputStream out) throws IOException {
+        DataOutputStream dos = new DataOutputStream(out);
+        writeFields(dos);
+    }
+}


Mime
View raw message