asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mhub...@apache.org
Subject [3/6] incubator-asterixdb git commit: Asterix NCs Failback Support
Date Thu, 18 Feb 2016 09:54:29 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index e4d94b4..b9447af 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -30,6 +30,7 @@ import java.nio.channels.SocketChannel;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -52,7 +53,6 @@ import org.apache.asterix.common.replication.IReplicaResourcesManager;
 import org.apache.asterix.common.replication.IReplicationChannel;
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.replication.IReplicationThread;
-import org.apache.asterix.common.replication.Replica;
 import org.apache.asterix.common.replication.ReplicaEvent;
 import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
 import org.apache.asterix.common.transactions.ILogManager;
@@ -72,6 +72,7 @@ import org.apache.asterix.replication.storage.LSMComponentLSNSyncTask;
 import org.apache.asterix.replication.storage.LSMComponentProperties;
 import org.apache.asterix.replication.storage.LSMIndexFileProperties;
 import org.apache.asterix.replication.storage.ReplicaResourcesManager;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.application.INCApplicationContext;
 import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
@@ -97,6 +98,7 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
     private final Map<String, LSMComponentProperties> lsmComponentId2PropertiesMap;
     private final Map<Long, RemoteLogMapping> localLSN2RemoteLSNMap;
     private final LSMComponentsSyncService lsmComponentLSNMappingService;
+    private final Set<Integer> nodeHostedPartitions;
 
     public ReplicationChannel(String nodeId, AsterixReplicationProperties replicationProperties, ILogManager logManager,
             IReplicaResourcesManager replicaResoucesManager, IReplicationManager replicationManager,
@@ -112,6 +114,17 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
         localLSN2RemoteLSNMap = new ConcurrentHashMap<Long, RemoteLogMapping>();
         lsmComponentLSNMappingService = new LSMComponentsSyncService();
         replicationThreads = Executors.newCachedThreadPool(appContext.getThreadFactory());
+        Map<String, ClusterPartition[]> nodePartitions = ((IAsterixPropertiesProvider) asterixAppRuntimeContextProvider
+                .getAppContext()).getMetadataProperties().getNodePartitions();
+        Set<String> nodeReplicationClients = replicationProperties.getNodeReplicationClients(nodeId);
+        List<Integer> clientsPartitions = new ArrayList<>();
+        for (String clientId : nodeReplicationClients) {
+            for (ClusterPartition clusterPartition : nodePartitions.get(clientId)) {
+                clientsPartitions.add(clusterPartition.getPartitionId());
+            }
+        }
+        nodeHostedPartitions = new HashSet<>(clientsPartitions.size());
+        nodeHostedPartitions.addAll(clientsPartitions);
     }
 
     @Override
@@ -193,6 +206,18 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
         }
     }
 
+    private static void sendRemoteRecoveryLog(ILogRecord logRecord, SocketChannel socketChannel, ByteBuffer outBuffer)
+            throws IOException {
+        logRecord.setLogSource(LogSource.REMOTE_RECOVERY);
+        if (logRecord.getSerializedLogSize() > outBuffer.capacity()) {
+            int requestSize = logRecord.getSerializedLogSize() + ReplicationProtocol.REPLICATION_REQUEST_HEADER_SIZE;
+            outBuffer = ByteBuffer.allocate(requestSize);
+        }
+        //set log source to REMOTE_RECOVERY to avoid re-logging on the recipient side
+        ReplicationProtocol.writeRemoteRecoveryLogRequest(outBuffer, logRecord);
+        NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer);
+    }
+
     /**
      * A replication thread is created per received replication request.
      */
@@ -232,9 +257,6 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
                         case REPLICA_EVENT:
                             handleReplicaEvent();
                             break;
-                        case UPDATE_REPLICA:
-                            handleUpdateReplica();
-                            break;
                         case GET_REPLICA_MAX_LSN:
                             handleGetReplicaMaxLSN();
                             break;
@@ -379,29 +401,33 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
 
             List<String> filesList;
             Set<String> replicaIds = request.getReplicaIds();
+            Set<String> requesterExistingFiles = request.getExistingFiles();
             Map<String, ClusterPartition[]> nodePartitions = ((IAsterixPropertiesProvider) asterixAppRuntimeContextProvider
                     .getAppContext()).getMetadataProperties().getNodePartitions();
             for (String replicaId : replicaIds) {
                 //get replica partitions
                 ClusterPartition[] replicaPatitions = nodePartitions.get(replicaId);
                 for (ClusterPartition partition : replicaPatitions) {
-                    filesList = replicaResourcesManager.getPartitionIndexesFiles(partition.getPartitionId());
-
+                    filesList = replicaResourcesManager.getPartitionIndexesFiles(partition.getPartitionId(), false);
                     //start sending files
                     for (String filePath : filesList) {
-                        try (RandomAccessFile fromFile = new RandomAccessFile(filePath, "r");
-                                FileChannel fileChannel = fromFile.getChannel();) {
-                            long fileSize = fileChannel.size();
-                            fileProperties.initialize(filePath, fileSize, replicaId, false,
-                                    IMetaDataPageManager.INVALID_LSN_OFFSET, false);
-                            outBuffer = ReplicationProtocol.writeFileReplicationRequest(outBuffer, fileProperties,
-                                    ReplicationRequestType.REPLICATE_FILE);
-
-                            //send file info
-                            NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer);
-
-                            //transfer file
-                            NetworkingUtil.sendFile(fileChannel, socketChannel);
+                        String relativeFilePath = PersistentLocalResourceRepository.getResourceRelativePath(filePath);
+                        //if the file already exists on the requester, skip it
+                        if (!requesterExistingFiles.contains(relativeFilePath)) {
+                            try (RandomAccessFile fromFile = new RandomAccessFile(filePath, "r");
+                                    FileChannel fileChannel = fromFile.getChannel();) {
+                                long fileSize = fileChannel.size();
+                                fileProperties.initialize(filePath, fileSize, replicaId, false,
+                                        IMetaDataPageManager.INVALID_LSN_OFFSET, false);
+                                outBuffer = ReplicationProtocol.writeFileReplicationRequest(outBuffer, fileProperties,
+                                        ReplicationRequestType.REPLICATE_FILE);
+
+                                //send file info
+                                NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer);
+
+                                //transfer file
+                                NetworkingUtil.sendFile(fileChannel, socketChannel);
+                            }
                         }
                     }
                 }
@@ -416,11 +442,23 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
             ReplicaLogsRequest request = ReplicationProtocol.readReplicaLogsRequest(inBuffer);
 
             Set<String> replicaIds = request.getReplicaIds();
+            //get list of partitions that belong to the replicas in the request
+            Set<Integer> requestedPartitions = new HashSet<>();
+            Map<String, ClusterPartition[]> nodePartitions = ((IAsterixPropertiesProvider) asterixAppRuntimeContextProvider
+                    .getAppContext()).getMetadataProperties().getNodePartitions();
+            for (String replicaId : replicaIds) {
+                //get replica partitions
+                ClusterPartition[] replicaPatitions = nodePartitions.get(replicaId);
+                for (ClusterPartition partition : replicaPatitions) {
+                    requestedPartitions.add(partition.getPartitionId());
+                }
+            }
+
             long fromLSN = request.getFromLSN();
             long minLocalFirstLSN = asterixAppRuntimeContextProvider.getAppContext().getTransactionSubsystem()
                     .getRecoveryManager().getLocalMinFirstLSN();
 
-            //get Log read
+            //get Log reader
             ILogReader logReader = logManager.getLogReader(true);
             try {
                 if (fromLSN < logManager.getReadableSmallestLSN()) {
@@ -429,25 +467,34 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
 
                 logReader.initializeScan(fromLSN);
                 ILogRecord logRecord = logReader.next();
+                Set<Integer> requestedPartitionsJobs = new HashSet<>();
                 while (logRecord != null) {
                     //we should not send any local log which has already been converted to disk component
                     if (logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLSN() < minLocalFirstLSN) {
                         logRecord = logReader.next();
                         continue;
                     }
-
-                    //since flush logs are not required for recovery, skip them
-                    if (replicaIds.contains(logRecord.getNodeId()) && logRecord.getLogType() != LogType.FLUSH) {
-                        if (logRecord.getSerializedLogSize() > outBuffer.capacity()) {
-                            int requestSize = logRecord.getSerializedLogSize()
-                                    + ReplicationProtocol.REPLICATION_REQUEST_HEADER_SIZE;
-                            outBuffer = ByteBuffer.allocate(requestSize);
-                        }
-
-                        //set log source to REMOTE_RECOVERY to avoid re-logging on the recipient side
-                        logRecord.setLogSource(LogSource.REMOTE_RECOVERY);
-                        ReplicationProtocol.writeRemoteRecoveryLogRequest(outBuffer, logRecord);
-                        NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer);
+                    //send only logs that belong to the partitions of the request and required for recovery
+                    switch (logRecord.getLogType()) {
+                        case LogType.UPDATE:
+                        case LogType.ENTITY_COMMIT:
+                        case LogType.UPSERT_ENTITY_COMMIT:
+                            if (requestedPartitions.contains(logRecord.getResourcePartition())) {
+                                sendRemoteRecoveryLog(logRecord, socketChannel, outBuffer);
+                                requestedPartitionsJobs.add(logRecord.getJobId());
+                            }
+                            break;
+                        case LogType.JOB_COMMIT:
+                            if (requestedPartitionsJobs.contains(logRecord.getJobId())) {
+                                sendRemoteRecoveryLog(logRecord, socketChannel, outBuffer);
+                                requestedPartitionsJobs.remove(logRecord.getJobId());
+                            }
+                            break;
+                        case LogType.ABORT:
+                        case LogType.FLUSH:
+                            break;
+                        default:
+                            throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
                     }
                     logRecord = logReader.next();
                 }
@@ -459,12 +506,6 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
             ReplicationProtocol.sendGoodbye(socketChannel);
         }
 
-        private void handleUpdateReplica() throws IOException {
-            inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
-            Replica replica = ReplicationProtocol.readReplicaUpdateRequest(inBuffer);
-            replicationManager.updateReplicaInfo(replica);
-        }
-
         private void handleReplicaEvent() throws IOException {
             inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
             ReplicaEvent event = ReplicationProtocol.readReplicaEventRequest(inBuffer);
@@ -484,37 +525,45 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
             inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
 
             //Deserialize log
-            remoteLog.readRemoteLog(inBuffer, false, localNodeID);
+            remoteLog.readRemoteLog(inBuffer, false);
             remoteLog.setLogSource(LogSource.REMOTE);
 
-            if (remoteLog.getLogType() == LogType.JOB_COMMIT) {
-                LogRecord jobCommitLog = new LogRecord();
-                TransactionUtil.formJobTerminateLogRecord(jobCommitLog, remoteLog.getJobId(), true,
-                        remoteLog.getNodeId());
-                jobCommitLog.setReplicationThread(this);
-                jobCommitLog.setLogSource(LogSource.REMOTE);
-                logManager.log(jobCommitLog);
-            } else if (remoteLog.getLogType() == LogType.FLUSH) {
-                LogRecord flushLog = new LogRecord();
-                TransactionUtil.formFlushLogRecord(flushLog, remoteLog.getDatasetId(), null, remoteLog.getNodeId(),
-                        remoteLog.getNumOfFlushedIndexes());
-                flushLog.setReplicationThread(this);
-                flushLog.setLogSource(LogSource.REMOTE);
-                synchronized (localLSN2RemoteLSNMap) {
-                    logManager.log(flushLog);
-
-                    //store mapping information for flush logs to use them in incoming LSM components.
-                    RemoteLogMapping flushLogMap = new RemoteLogMapping();
-                    flushLogMap.setRemoteLSN(remoteLog.getLSN());
-                    flushLogMap.setRemoteNodeID(remoteLog.getNodeId());
-                    flushLogMap.setLocalLSN(flushLog.getLSN());
-                    flushLogMap.numOfFlushedIndexes.set(remoteLog.getNumOfFlushedIndexes());
-                    localLSN2RemoteLSNMap.put(flushLog.getLSN(), flushLogMap);
-                    localLSN2RemoteLSNMap.notifyAll();
-                }
-            } else {
-                //send log to LogManager as a remote log
-                logManager.log(remoteLog);
+            switch (remoteLog.getLogType()) {
+                case LogType.UPDATE:
+                case LogType.ENTITY_COMMIT:
+                case LogType.UPSERT_ENTITY_COMMIT:
+                    //if the log partition belongs to a partitions hosted on this node, replicate it
+                    if (nodeHostedPartitions.contains(remoteLog.getResourcePartition())) {
+                        logManager.log(remoteLog);
+                    }
+                    break;
+                case LogType.JOB_COMMIT:
+                    LogRecord jobCommitLog = new LogRecord();
+                    TransactionUtil.formJobTerminateLogRecord(jobCommitLog, remoteLog.getJobId(), true);
+                    jobCommitLog.setReplicationThread(this);
+                    jobCommitLog.setLogSource(LogSource.REMOTE);
+                    logManager.log(jobCommitLog);
+                    break;
+                case LogType.FLUSH:
+                    LogRecord flushLog = new LogRecord();
+                    TransactionUtil.formFlushLogRecord(flushLog, remoteLog.getDatasetId(), null, remoteLog.getNodeId(),
+                            remoteLog.getNumOfFlushedIndexes());
+                    flushLog.setReplicationThread(this);
+                    flushLog.setLogSource(LogSource.REMOTE);
+                    synchronized (localLSN2RemoteLSNMap) {
+                        logManager.log(flushLog);
+                        //store mapping information for flush logs to use them in incoming LSM components.
+                        RemoteLogMapping flushLogMap = new RemoteLogMapping();
+                        flushLogMap.setRemoteLSN(remoteLog.getLSN());
+                        flushLogMap.setRemoteNodeID(remoteLog.getNodeId());
+                        flushLogMap.setLocalLSN(flushLog.getLSN());
+                        flushLogMap.numOfFlushedIndexes.set(remoteLog.getNumOfFlushedIndexes());
+                        localLSN2RemoteLSNMap.put(flushLog.getLSN(), flushLogMap);
+                        localLSN2RemoteLSNMap.notifyAll();
+                    }
+                    break;
+                default:
+                    throw new ACIDException("Unsupported LogType: " + remoteLog.getLogType());
             }
         }
 
@@ -649,4 +698,4 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
             }
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationLifecycleListener.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationLifecycleListener.java b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationLifecycleListener.java
deleted file mode 100644
index 39130a4..0000000
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationLifecycleListener.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.replication.management;
-
-import java.util.Set;
-
-import org.apache.asterix.common.api.IClusterEventsSubscriber;
-import org.apache.asterix.common.api.IClusterManagementWork;
-import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
-import org.apache.asterix.common.api.IClusterManagementWorkResponse;
-import org.apache.asterix.common.config.AsterixReplicationProperties;
-import org.apache.asterix.common.replication.Replica;
-import org.apache.asterix.common.replication.ReplicaEvent;
-import org.apache.asterix.common.replication.ReplicaEvent.ReplicaEventType;
-
-public class ReplicationLifecycleListener implements IClusterEventsSubscriber {
-
-    private final AsterixReplicationProperties asterixReplicationProperties;
-    public static ReplicationLifecycleListener INSTANCE;
-
-    public ReplicationLifecycleListener(AsterixReplicationProperties asterixReplicationProperties) {
-        this.asterixReplicationProperties = asterixReplicationProperties;
-    }
-
-    @Override
-    public Set<IClusterManagementWork> notifyNodeFailure(Set<String> deadNodeIds) {
-        //notify impacted replicas
-        for (String deadNodeId : deadNodeIds) {
-            Replica eventOnReplica = asterixReplicationProperties.getReplicaById(deadNodeId);
-            ReplicaEvent event = new ReplicaEvent(eventOnReplica, ReplicaEventType.FAIL);
-            ReplicaEventNotifier notifier = new ReplicaEventNotifier(event, asterixReplicationProperties);
-
-            //start notifier
-            new Thread(notifier).start();
-        }
-
-        return null;
-    }
-
-    @Override
-    public Set<IClusterManagementWork> notifyNodeJoin(String joinedNodeId) {
-        Replica eventOnReplica = asterixReplicationProperties.getReplicaById(joinedNodeId);
-        ReplicaEvent event = new ReplicaEvent(eventOnReplica, ReplicaEventType.JOIN);
-        ReplicaEventNotifier notifier = new ReplicaEventNotifier(event, asterixReplicationProperties);
-
-        //start notifier
-        new Thread(notifier).start();
-
-        return null;
-    }
-
-    @Override
-    public void notifyRequestCompletion(IClusterManagementWorkResponse response) {
-        //do nothing
-    }
-
-    @Override
-    public void notifyStateChange(ClusterState previousState, ClusterState newState) {
-        //do nothing
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index 5c35df4..7243629 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -37,6 +37,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -50,7 +51,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.AsterixReplicationProperties;
+import org.apache.asterix.common.config.IAsterixPropertiesProvider;
 import org.apache.asterix.common.dataflow.AsterixLSMIndexUtil;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.replication.AsterixReplicationJob;
@@ -59,23 +62,25 @@ import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.replication.Replica;
 import org.apache.asterix.common.replication.Replica.ReplicaState;
 import org.apache.asterix.common.replication.ReplicaEvent;
-import org.apache.asterix.common.replication.ReplicaEvent.ReplicaEventType;
 import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.ILogRecord;
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.LogType;
 import org.apache.asterix.event.schema.cluster.Node;
-import org.apache.asterix.replication.functions.ReplicationProtocol;
-import org.apache.asterix.replication.functions.ReplicationProtocol.ReplicationRequestType;
 import org.apache.asterix.replication.functions.ReplicaFilesRequest;
 import org.apache.asterix.replication.functions.ReplicaIndexFlushRequest;
 import org.apache.asterix.replication.functions.ReplicaLogsRequest;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.asterix.replication.functions.ReplicationProtocol.ReplicationRequestType;
 import org.apache.asterix.replication.logging.ReplicationLogBuffer;
 import org.apache.asterix.replication.logging.ReplicationLogFlusher;
 import org.apache.asterix.replication.storage.LSMComponentProperties;
 import org.apache.asterix.replication.storage.LSMIndexFileProperties;
 import org.apache.asterix.replication.storage.ReplicaResourcesManager;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.replication.IReplicationJob;
 import org.apache.hyracks.api.replication.IReplicationJob.ReplicationExecutionType;
 import org.apache.hyracks.api.replication.IReplicationJob.ReplicationJobType;
@@ -107,6 +112,7 @@ public class ReplicationManager implements IReplicationManager {
     private final IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider;
     private final AsterixReplicationProperties replicationProperties;
     private final Map<String, Replica> replicas;
+    private final Map<String, Set<Integer>> replica2PartitionsMap;
 
     private final AtomicBoolean replicationSuspended;
     private AtomicBoolean terminateJobsReplication;
@@ -117,8 +123,8 @@ public class ReplicationManager implements IReplicationManager {
     private ReplicationJobsProccessor replicationJobsProcessor;
     private final ReplicasEventsMonitor replicationMonitor;
     //dummy job used to stop ReplicationJobsProccessor thread.
-    private static final IReplicationJob replicationJobPoisonPill = new AsterixReplicationJob(
-            ReplicationJobType.METADATA, ReplicationOperation.STOP, ReplicationExecutionType.ASYNC, null);
+    private static final IReplicationJob REPLICATION_JOB_POISON_PILL = new AsterixReplicationJob(
+            ReplicationJobType.METADATA, ReplicationOperation.REPLICATE, ReplicationExecutionType.ASYNC, null);
     //used to identify the correct IP address when the node has multiple network interfaces
     private String hostIPAddressFirstOctet = null;
 
@@ -128,6 +134,7 @@ public class ReplicationManager implements IReplicationManager {
     private ReplicationLogFlusher txnlogsReplicator;
     private Future<? extends Object> txnLogReplicatorTask;
     private Map<String, SocketChannel> logsReplicaSockets = null;
+
     //TODO this class needs to be refactored by moving its private classes to separate files
     //and possibly using MessageBroker to send/receive remote replicas events.
     public ReplicationManager(String nodeId, AsterixReplicationProperties replicationProperties,
@@ -144,7 +151,6 @@ public class ReplicationManager implements IReplicationManager {
         terminateJobsReplication = new AtomicBoolean(false);
         jobsReplicationSuspended = new AtomicBoolean(true);
         replicationSuspended = new AtomicBoolean(true);
-
         replicas = new HashMap<String, Replica>();
         jobCommitAcks = new ConcurrentHashMap<Integer, Set<String>>();
         replicationJobsPendingAcks = new ConcurrentHashMap<Integer, ILogRecord>();
@@ -156,12 +162,25 @@ public class ReplicationManager implements IReplicationManager {
         replicationJobsProcessor = new ReplicationJobsProccessor();
         replicationMonitor = new ReplicasEventsMonitor();
 
+        Map<String, ClusterPartition[]> nodePartitions = ((IAsterixPropertiesProvider) asterixAppRuntimeContextProvider
+                .getAppContext()).getMetadataProperties().getNodePartitions();
         //add list of replicas from configurations (To be read from another source e.g. Zookeeper)
         Set<Replica> replicaNodes = replicationProperties.getRemoteReplicas(nodeId);
-        if (replicaNodes != null) {
-            for (Replica replica : replicaNodes) {
-                replicas.put(replica.getNode().getId(), replica);
+        replica2PartitionsMap = new HashMap<>(replicaNodes.size());
+        for (Replica replica : replicaNodes) {
+            replicas.put(replica.getNode().getId(), replica);
+            //for each remote replica, get the list of replication clients
+            Set<String> nodeReplicationClients = replicationProperties.getNodeReplicationClients(replica.getId());
+            //get the partitions of each client
+            List<Integer> clientPartitions = new ArrayList<>();
+            for (String clientId : nodeReplicationClients) {
+                for (ClusterPartition clusterPartition : nodePartitions.get(clientId)) {
+                    clientPartitions.add(clusterPartition.getPartitionId());
+                }
             }
+            Set<Integer> clientPartitonsSet = new HashSet<>(clientPartitions.size());
+            clientPartitonsSet.addAll(clientPartitions);
+            replica2PartitionsMap.put(replica.getId(), clientPartitonsSet);
         }
         int numLogBuffers = logManager.getNumLogPages();
         emptyLogBuffersQ = new LinkedBlockingQueue<ReplicationLogBuffer>(numLogBuffers);
@@ -173,10 +192,6 @@ public class ReplicationManager implements IReplicationManager {
         }
     }
 
-    /**
-     * Accepts a replication job. If the job execution type is ASYNC, it is queued.
-     * Otherwise, it is processed immediately.
-     */
     @Override
     public void submitJob(IReplicationJob job) throws IOException {
         if (job.getExecutionType() == ReplicationExecutionType.ASYNC) {
@@ -253,24 +268,28 @@ public class ReplicationManager implements IReplicationManager {
      */
     private void processJob(IReplicationJob job, Map<String, SocketChannel> replicasSockets, ByteBuffer requestBuffer)
             throws IOException {
-        boolean isLSMComponentFile;
-        ByteBuffer responseBuffer = null;
-        LSMIndexFileProperties asterixFileProperties = new LSMIndexFileProperties();
-        if (requestBuffer == null) {
-            requestBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
-        }
-
-        isLSMComponentFile = job.getJobType() == ReplicationJobType.LSM_COMPONENT ? true : false;
         try {
-            //if there isn't already a connection, establish a new one
-            if (replicasSockets == null) {
-                replicasSockets = getActiveRemoteReplicasSockets();
+
+            //all of the job's files belong to a single storage partition.
+            //get any of them to determine the partition from the file path.
+            String jobFile = job.getJobFiles().iterator().next();
+            int jobPartitionId = PersistentLocalResourceRepository.getResourcePartition(jobFile);
+
+            ByteBuffer responseBuffer = null;
+            LSMIndexFileProperties asterixFileProperties = new LSMIndexFileProperties();
+            if (requestBuffer == null) {
+                requestBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
             }
 
-            int remainingFiles = job.getJobFiles().size();
+            boolean isLSMComponentFile = job.getJobType() == ReplicationJobType.LSM_COMPONENT;
+            try {
+                //if there isn't already a connection, establish a new one
+                if (replicasSockets == null) {
+                    replicasSockets = getActiveRemoteReplicasSockets();
+                }
 
-            if (job.getOperation() == ReplicationOperation.REPLICATE) {
-                try {
+                int remainingFiles = job.getJobFiles().size();
+                if (job.getOperation() == ReplicationOperation.REPLICATE) {
                     //if the replication job is an LSM_COMPONENT, its properties are sent first, then its files.
                     ILSMIndexReplicationJob LSMComponentJob = null;
                     if (job.getJobType() == ReplicationJobType.LSM_COMPONENT) {
@@ -316,6 +335,10 @@ public class ReplicationManager implements IReplicationManager {
                             Iterator<Map.Entry<String, SocketChannel>> iterator = replicasSockets.entrySet().iterator();
                             while (iterator.hasNext()) {
                                 Map.Entry<String, SocketChannel> entry = iterator.next();
+                                //if the remote replica is not interested in this partition, skip it.
+                                if (!replica2PartitionsMap.get(entry.getKey()).contains(jobPartitionId)) {
+                                    continue;
+                                }
                                 SocketChannel socketChannel = entry.getValue();
                                 //transfer request header & file
                                 try {
@@ -338,44 +361,52 @@ public class ReplicationManager implements IReplicationManager {
                             }
                         }
                     }
-                } finally {
-                    if (job instanceof ILSMIndexReplicationJob) {
-                        //exit the replicated LSM components
-                        ILSMIndexReplicationJob aJob = (ILSMIndexReplicationJob) job;
-                        aJob.endReplication();
-                    }
-                }
-            } else if (job.getOperation() == ReplicationOperation.DELETE) {
-                for (String filePath : job.getJobFiles()) {
-                    remainingFiles--;
-                    asterixFileProperties.initialize(filePath, -1, nodeId, isLSMComponentFile,
-                            IMetaDataPageManager.INVALID_LSN_OFFSET, remainingFiles == 0);
-                    ReplicationProtocol.writeFileReplicationRequest(requestBuffer, asterixFileProperties,
-                            ReplicationRequestType.DELETE_FILE);
-
-                    Iterator<Map.Entry<String, SocketChannel>> iterator = replicasSockets.entrySet().iterator();
-                    while (iterator.hasNext()) {
-                        Map.Entry<String, SocketChannel> entry = iterator.next();
-                        SocketChannel socketChannel = entry.getValue();
-                        try {
-                            sendRequest(replicasSockets, requestBuffer);
-                            if (asterixFileProperties.requiresAck()) {
-                                waitForResponse(socketChannel, responseBuffer);
+                } else if (job.getOperation() == ReplicationOperation.DELETE) {
+                    for (String filePath : job.getJobFiles()) {
+                        remainingFiles--;
+                        asterixFileProperties.initialize(filePath, -1, nodeId, isLSMComponentFile,
+                                IMetaDataPageManager.INVALID_LSN_OFFSET, remainingFiles == 0);
+                        ReplicationProtocol.writeFileReplicationRequest(requestBuffer, asterixFileProperties,
+                                ReplicationRequestType.DELETE_FILE);
+
+                        Iterator<Map.Entry<String, SocketChannel>> iterator = replicasSockets.entrySet().iterator();
+                        while (iterator.hasNext()) {
+                            Map.Entry<String, SocketChannel> entry = iterator.next();
+                            //if the remote replica is not interested in this partition, skip it.
+                            if (!replica2PartitionsMap.get(entry.getKey()).contains(jobPartitionId)) {
+                                continue;
+                            }
+                            SocketChannel socketChannel = entry.getValue();
+                            try {
+                                sendRequest(replicasSockets, requestBuffer);
+                                if (asterixFileProperties.requiresAck()) {
+                                    waitForResponse(socketChannel, responseBuffer);
+                                }
+                            } catch (IOException e) {
+                                reportFailedReplica(entry.getKey());
+                                iterator.remove();
+                            } finally {
+                                requestBuffer.position(0);
                             }
-                        } catch (IOException e) {
-                            reportFailedReplica(entry.getKey());
-                            iterator.remove();
-                        } finally {
-                            requestBuffer.position(0);
                         }
                     }
                 }
+            } finally {
+                //if sync, close sockets with replicas since they wont be reused
+                if (job.getExecutionType() == ReplicationExecutionType.SYNC) {
+                    closeReplicaSockets(replicasSockets);
+                }
             }
         } finally {
-            //if sync, close sockets with replicas since they wont be reused
-            if (job.getExecutionType() == ReplicationExecutionType.SYNC) {
-                closeReplicaSockets(replicasSockets);
-            }
+            exitReplicatedLSMComponent(job);
+        }
+    }
+
+    private static void exitReplicatedLSMComponent(IReplicationJob job) throws HyracksDataException {
+        if (job.getOperation() == ReplicationOperation.REPLICATE && job instanceof ILSMIndexReplicationJob) {
+            //exit the replicated LSM components
+            ILSMIndexReplicationJob aJob = (ILSMIndexReplicationJob) job;
+            aJob.endReplication();
         }
     }
 
@@ -398,8 +429,7 @@ public class ReplicationManager implements IReplicationManager {
         }
 
         //read response from remote replicas
-        ReplicationRequestType responseFunction = ReplicationProtocol.getRequestType(socketChannel,
-                responseBuffer);
+        ReplicationRequestType responseFunction = ReplicationProtocol.getRequestType(socketChannel, responseBuffer);
         return responseFunction;
     }
 
@@ -415,17 +445,7 @@ public class ReplicationManager implements IReplicationManager {
         if (replica.getState() == ReplicaState.ACTIVE) {
             return;
         }
-
         replica.getNode().setClusterIp(replicaNode.getNode().getClusterIp());
-
-        /*
-         * This could be used to reconnect to replica without needing the Cluster notifications
-        if (replica.getState() == ReplicaState.DEAD) {
-            reportFailedReplica(replica.getNode().getId());
-        } else if (replica.getState() == ReplicaState.ACTIVE) {
-            checkReplicaState(replica.getNode().getId(), true);
-        }
-        */
     }
 
     /**
@@ -440,7 +460,7 @@ public class ReplicationManager implements IReplicationManager {
             if (force) {
                 terminateJobsReplication.set(true);
             }
-            replicationJobsQ.offer(replicationJobPoisonPill);
+            replicationJobsQ.offer(REPLICATION_JOB_POISON_PILL);
 
             //wait until the jobs are suspended
             synchronized (jobsReplicationSuspended) {
@@ -504,27 +524,6 @@ public class ReplicationManager implements IReplicationManager {
         }
     }
 
-    @Override
-    public void broadcastNewIPAddress() throws IOException {
-        String orignalIPAddress = replicationProperties.getReplicaIPAddress(nodeId);
-        String newAddress = NetworkingUtil.getHostAddress(hostIPAddressFirstOctet);
-
-        //IP Address didn't change after failure
-        if (orignalIPAddress.equals(newAddress)) {
-            return;
-        }
-
-        Node node = new Node();
-        node.setId(nodeId);
-        node.setClusterIp(newAddress);
-        Replica replica = new Replica(node);
-
-        ByteBuffer buffer = ReplicationProtocol.writeUpdateReplicaRequest(replica);
-        Map<String, SocketChannel> replicaSockets = getActiveRemoteReplicasSockets();
-        sendRequest(replicaSockets, buffer);
-        closeReplicaSockets(replicaSockets);
-    }
-
     /**
      * Sends a shutdown event to remote replicas notifying them
      * no more logs/files will be sent from this local replica.
@@ -536,7 +535,7 @@ public class ReplicationManager implements IReplicationManager {
         node.setId(nodeId);
         node.setClusterIp(NetworkingUtil.getHostAddress(hostIPAddressFirstOctet));
         Replica replica = new Replica(node);
-        ReplicaEvent event = new ReplicaEvent(replica, ReplicaEventType.SHUTDOWN);
+        ReplicaEvent event = new ReplicaEvent(replica, ClusterEventType.NODE_SHUTTING_DOWN);
         ByteBuffer buffer = ReplicationProtocol.writeReplicaEventRequest(event);
         Map<String, SocketChannel> replicaSockets = getActiveRemoteReplicasSockets();
         sendRequest(replicaSockets, buffer);
@@ -672,9 +671,6 @@ public class ReplicationManager implements IReplicationManager {
 
         if (newState == ReplicaState.ACTIVE) {
             replicationFactor++;
-            //TODO Extra check: make sure newly added replica is in sync.
-            //Since in the current design the whole cluster becomes UNUSABLE,
-            //no new jobs could start before the failed node rejoins
         } else if (newState == ReplicaState.DEAD) {
             if (replicationFactor > INITIAL_REPLICATION_FACTOR) {
                 replicationFactor--;
@@ -873,7 +869,7 @@ public class ReplicationManager implements IReplicationManager {
         //need to stop processing any new logs or jobs
         terminateJobsReplication.set(true);
 
-        ReplicaEvent event = new ReplicaEvent(replica, ReplicaEventType.FAIL);
+        ReplicaEvent event = new ReplicaEvent(replica, ClusterEventType.NODE_FAILURE);
         reportReplicaEvent(event);
     }
 
@@ -986,9 +982,10 @@ public class ReplicationManager implements IReplicationManager {
 
     //Recovery Method
     @Override
-    public void requestReplicaFiles(String selectedReplicaId, Set<String> replicasDataToRecover) throws IOException {
-        ReplicaFilesRequest request = new ReplicaFilesRequest(replicasDataToRecover);
-        ReplicationProtocol.writeGetReplicaFilesRequest(dataBuffer, request);
+    public void requestReplicaFiles(String selectedReplicaId, Set<String> replicasDataToRecover,
+            Set<String> existingFiles) throws IOException {
+        ReplicaFilesRequest request = new ReplicaFilesRequest(replicasDataToRecover, existingFiles);
+        dataBuffer = ReplicationProtocol.writeGetReplicaFilesRequest(dataBuffer, request);
 
         try (SocketChannel socketChannel = getReplicaSocket(selectedReplicaId)) {
 
@@ -997,8 +994,7 @@ public class ReplicationManager implements IReplicationManager {
 
             String indexPath;
             String destFilePath;
-            ReplicationRequestType responseFunction = ReplicationProtocol.getRequestType(socketChannel,
-                    dataBuffer);
+            ReplicationRequestType responseFunction = ReplicationProtocol.getRequestType(socketChannel, dataBuffer);
             LSMIndexFileProperties fileProperties;
             while (responseFunction != ReplicationRequestType.GOODBYE) {
                 dataBuffer = ReplicationProtocol.readRequest(socketChannel, dataBuffer);
@@ -1057,11 +1053,10 @@ public class ReplicationManager implements IReplicationManager {
 
     //Recovery Method
     @Override
-    public ArrayList<ILogRecord> requestReplicaLogs(String remoteNode, Set<String> nodeIdsToRecoverFor, long fromLSN)
-            throws IOException, ACIDException {
+    public void requestReplicaLogs(String remoteNode, Set<String> nodeIdsToRecoverFor, long fromLSN,
+            File recoveryLogsFile) throws IOException, ACIDException {
         ReplicaLogsRequest request = new ReplicaLogsRequest(nodeIdsToRecoverFor, fromLSN);
         dataBuffer = ReplicationProtocol.writeGetReplicaLogsRequest(dataBuffer, request);
-
         try (SocketChannel socketChannel = getReplicaSocket(remoteNode)) {
             //transfer request
             NetworkingUtil.transferBufferToChannel(socketChannel, dataBuffer);
@@ -1069,28 +1064,55 @@ public class ReplicationManager implements IReplicationManager {
             //read response type
             ReplicationRequestType responseType = ReplicationProtocol.getRequestType(socketChannel, dataBuffer);
 
-            ArrayList<ILogRecord> recoveryLogs = new ArrayList<ILogRecord>();
             ILogRecord logRecord = new LogRecord();
-            while (responseType != ReplicationRequestType.GOODBYE) {
-                dataBuffer = ReplicationProtocol.readRequest(socketChannel, dataBuffer);
-                logRecord.readRemoteLog(dataBuffer, true, nodeId);
-
-                if (logRecord.getNodeId().equals(nodeId)) {
-                    //store log in memory to replay it for recovery
-                    recoveryLogs.add(logRecord);
-                    //this needs to be a new log object so that it is passed to recovery manager as a different object
-                    logRecord = new LogRecord();
-                } else {
-                    //send log to log manager as a remote recovery log
-                    logManager.log(logRecord);
+            Set<Integer> nodePartitions = ((PersistentLocalResourceRepository) asterixAppRuntimeContextProvider
+                    .getLocalResourceRepository()).getNodeOrignalPartitions();
+            Set<Integer> nodePartitionsJobs = new HashSet<>();
+            try (RandomAccessFile raf = new RandomAccessFile(recoveryLogsFile, "rw");
+                    FileChannel fileChannel = raf.getChannel();) {
+                while (responseType != ReplicationRequestType.GOODBYE) {
+                    dataBuffer = ReplicationProtocol.readRequest(socketChannel, dataBuffer);
+                    logRecord.readRemoteLog(dataBuffer, true);
+                    switch (logRecord.getLogType()) {
+                        case LogType.UPDATE:
+                        case LogType.ENTITY_COMMIT:
+                        case LogType.UPSERT_ENTITY_COMMIT:
+                            if (nodePartitions.contains(logRecord.getResourcePartition())) {
+                                nodePartitionsJobs.add(logRecord.getJobId());
+                                dataBuffer.flip();
+                                while (dataBuffer.hasRemaining()) {
+                                    //store log in temp file to replay it for recovery
+                                    fileChannel.write(dataBuffer);
+                                }
+                            } else {
+                                //send log to log manager as a remote recovery log
+                                logManager.log(logRecord);
+                            }
+                            break;
+                        case LogType.JOB_COMMIT:
+                            if (nodePartitionsJobs.contains(logRecord.getJobId())) {
+                                nodePartitionsJobs.remove(logRecord.getJobId());
+                                dataBuffer.flip();
+                                while (dataBuffer.hasRemaining()) {
+                                    //store log in temp file to replay it for recovery
+                                    fileChannel.write(dataBuffer);
+                                }
+                                break;
+                            }
+                            logManager.log(logRecord);
+                            break;
+                        case LogType.ABORT:
+                        case LogType.FLUSH:
+                            break;
+                        default:
+                            throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
+                    }
+                    responseType = ReplicationProtocol.getRequestType(socketChannel, dataBuffer);
                 }
-
-                responseType = ReplicationProtocol.getRequestType(socketChannel, dataBuffer);
             }
 
             //send goodbye
             ReplicationProtocol.sendGoodbye(socketChannel);
-            return recoveryLogs;
         }
     }
 
@@ -1108,13 +1130,14 @@ public class ReplicationManager implements IReplicationManager {
                     event = replicaEventsQ.take();
 
                     switch (event.getEventType()) {
-                        case FAIL:
+                        case NODE_FAILURE:
                             handleReplicaFailure(event.getReplica().getId());
                             break;
-                        case JOIN:
+                        case NODE_JOIN:
+                            updateReplicaInfo(event.getReplica());
                             checkReplicaState(event.getReplica().getId(), false, true);
                             break;
-                        case SHUTDOWN:
+                        case NODE_SHUTTING_DOWN:
                             handleShutdownEvent(event.getReplica().getId());
                             break;
                         default:
@@ -1168,18 +1191,17 @@ public class ReplicationManager implements IReplicationManager {
                     }
 
                     IReplicationJob job = replicationJobsQ.take();
-                    if (job.getOperation() != ReplicationOperation.STOP) {
-                        //if there isn't already a connection, establish a new one
-                        if (replicaSockets == null) {
-                            replicaSockets = getActiveRemoteReplicasSockets();
-                        }
-
-                        processJob(job, replicaSockets, reusableBuffer);
-                    } else {
+                    if (job == REPLICATION_JOB_POISON_PILL) {
                         terminateJobsReplication.set(true);
                         continue;
                     }
 
+                    //if there isn't already a connection, establish a new one
+                    if (replicaSockets == null) {
+                        replicaSockets = getActiveRemoteReplicasSockets();
+                    }
+                    processJob(job, replicaSockets, reusableBuffer);
+
                     //if no more jobs to process, close sockets
                     if (replicationJobsQ.size() == 0) {
                         LOGGER.log(Level.INFO, "No pending replication jobs. Closing connections to replicas");

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java b/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
index ee987f8..c905add 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
@@ -18,8 +18,11 @@
  */
 package org.apache.asterix.replication.recovery;
 
+import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -30,56 +33,62 @@ import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.AsterixReplicationProperties;
+import org.apache.asterix.common.config.IAsterixPropertiesProvider;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.replication.IRemoteRecoveryManager;
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.transactions.ILogManager;
-import org.apache.asterix.common.transactions.ILogRecord;
+import org.apache.asterix.common.transactions.ILogReader;
+import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.replication.storage.ReplicaResourcesManager;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.asterix.transaction.management.service.logging.RemoteLogReader;
 
 public class RemoteRecoveryManager implements IRemoteRecoveryManager {
 
+    private static final String RECOVERY_LOGS_FILE_NAME = "recoveryLogs";
     private final IReplicationManager replicationManager;
-    private final ILogManager logManager;
-    public static final boolean IS_DEBUG_MODE = false;//true
     private static final Logger LOGGER = Logger.getLogger(RemoteRecoveryManager.class.getName());
     private final IAsterixAppRuntimeContext runtimeContext;
     private final AsterixReplicationProperties replicationProperties;
+    private final static int REMOTE_RECOVERY_JOB_ID = -1;
+    private Map<String, Set<String>> failbackRecoveryReplicas;
 
     public RemoteRecoveryManager(IReplicationManager replicationManager, IAsterixAppRuntimeContext runtimeContext,
             AsterixReplicationProperties replicationProperties) {
         this.replicationManager = replicationManager;
         this.runtimeContext = runtimeContext;
-        this.logManager = runtimeContext.getTransactionSubsystem().getLogManager();
         this.replicationProperties = replicationProperties;
     }
 
     @Override
     public void performRemoteRecovery() {
-        //TODO this method needs to be adapted to perform failback when autoFailover is enabled.
-        //Currently we will not allow a node to perform remote recovery since another replica
-        //already tookover its workload and might not resync correctly if there are on on-going
-        //jobs on the replica.
-        if (AsterixClusterProperties.INSTANCE.isAutoFailoverEnabled()) {
-            throw new IllegalStateException("Cannot perform remote recovery when auto failover is enabled.");
-        }
         //The whole remote recovery process should be atomic.
-        //Any error happens, we should start the recovery from the start until the recovery is complete or an illegal state is reached (cannot recovery).
-        int maxRecoveryAttempts = 10;
+        //Any error happens, we should start the recovery from the start until the recovery is
+        //complete or an illegal state is reached (cannot recover or max attempts exceed).
+        int maxRecoveryAttempts = replicationProperties.getMaxRemoteRecoveryAttempts();
         PersistentLocalResourceRepository resourceRepository = (PersistentLocalResourceRepository) runtimeContext
                 .getLocalResourceRepository();
+        IRecoveryManager recoveryManager = runtimeContext.getTransactionSubsystem().getRecoveryManager();
+        ILogManager logManager = runtimeContext.getTransactionSubsystem().getLogManager();
         while (true) {
             //start recovery steps
             try {
-                maxRecoveryAttempts--;
-
-                if (maxRecoveryAttempts == 0) {
+                if (maxRecoveryAttempts <= 0) {
                     //to avoid infinite loop in case of unexpected behavior.
                     throw new IllegalStateException("Failed to perform remote recovery.");
                 }
 
+                //delete any existing recovery files from previous failed recovery attempts
+                recoveryManager.deleteRecoveryTemporaryFiles();
+
+                //create temporary file to store recovery logs
+                File recoveryLogsFile = recoveryManager.createJobRecoveryFile(REMOTE_RECOVERY_JOB_ID,
+                        RECOVERY_LOGS_FILE_NAME);
+
                 /*** Prepare for Recovery ***/
                 //1. check remote replicas states
                 replicationManager.initializeReplicasState();
@@ -93,8 +102,9 @@ public class RemoteRecoveryManager implements IRemoteRecoveryManager {
                 IDatasetLifecycleManager datasetLifeCycleManager = runtimeContext.getDatasetLifecycleManager();
                 datasetLifeCycleManager.closeAllDatasets();
 
-                //3. remove any existing storage data
+                //3. remove any existing storage data and initialize storage metadata
                 resourceRepository.deleteStorageData(true);
+                resourceRepository.initializeNewUniverse(AsterixClusterProperties.INSTANCE.getStorageDirectoryName());
 
                 //4. select remote replicas to recover from per lost replica data
                 Map<String, Set<String>> selectedRemoteReplicas = constructRemoteRecoveryPlan();
@@ -110,47 +120,42 @@ public class RemoteRecoveryManager implements IRemoteRecoveryManager {
                     String replicaId = remoteReplica.getKey();
                     Set<String> replicasDataToRecover = remoteReplica.getValue();
 
-                    //1. Request indexes metadata and LSM components
-                    replicationManager.requestReplicaFiles(replicaId, replicasDataToRecover);
-
-                    //2. Initialize local resources based on the newly received files (if we are recovering the primary replica on this node)
-                    if (replicasDataToRecover.contains(logManager.getNodeId())) {
-                        ((PersistentLocalResourceRepository) runtimeContext.getLocalResourceRepository())
-                                .initializeNewUniverse(AsterixClusterProperties.INSTANCE.getStorageDirectoryName());
-                        //initialize resource id factor to correct max resource id
-                        runtimeContext.initializeResourceIdFactory();
-                    }
+                    //Request indexes metadata and LSM components
+                    replicationManager.requestReplicaFiles(replicaId, replicasDataToRecover, new HashSet<String>());
 
-                    //3. Get min LSN to start requesting logs from
+                    //Get min LSN to start requesting logs from
                     long minLSN = replicationManager.requestReplicaMinLSN(replicaId);
 
-                    //4. Request remote logs from selected remote replicas
-                    ArrayList<ILogRecord> remoteRecoveryLogs = replicationManager.requestReplicaLogs(replicaId,
-                            replicasDataToRecover, minLSN);
+                    //Request remote logs from selected remote replicas
+                    replicationManager.requestReplicaLogs(replicaId, replicasDataToRecover, minLSN, recoveryLogsFile);
 
-                    //5. Replay remote logs using recovery manager
+                    //Replay remote logs using recovery manager
                     if (replicasDataToRecover.contains(logManager.getNodeId())) {
-                        if (remoteRecoveryLogs.size() > 0) {
-                            runtimeContext.getTransactionSubsystem().getRecoveryManager()
-                                    .replayRemoteLogs(remoteRecoveryLogs);
+                        //replay logs for local partitions only
+                        Set<Integer> nodePartitions = resourceRepository.getNodeOrignalPartitions();
+                        try (RandomAccessFile raf = new RandomAccessFile(recoveryLogsFile, "r");
+                                FileChannel fileChannel = raf.getChannel();) {
+                            ILogReader logReader = new RemoteLogReader(fileChannel, fileChannel.size(),
+                                    logManager.getLogPageSize());
+                            recoveryManager.replayPartitionsLogs(nodePartitions, logReader, 0);
                         }
-                        remoteRecoveryLogs.clear();
                     }
                 }
-
                 LOGGER.log(Level.INFO, "Completed remote recovery successfully!");
                 break;
             } catch (Exception e) {
                 e.printStackTrace();
                 LOGGER.log(Level.WARNING, "Failed during remote recovery. Attempting again...");
+                maxRecoveryAttempts--;
             }
         }
     }
 
     private Map<String, Set<String>> constructRemoteRecoveryPlan() {
         //1. identify which replicas reside in this node
-        String localNodeId = logManager.getNodeId();
-        Set<String> nodes = replicationProperties.getNodeReplicasIds(localNodeId);
+        String localNodeId = runtimeContext.getTransactionSubsystem().getId();
+
+        Set<String> nodes = replicationProperties.getNodeReplicationClients(localNodeId);
 
         Map<String, Set<String>> recoveryCandidates = new HashMap<String, Set<String>>();
         Map<String, Integer> candidatesScore = new HashMap<String, Integer>();
@@ -187,7 +192,6 @@ public class RemoteRecoveryManager implements IRemoteRecoveryManager {
 
         //3. find best candidate to recover from per lost replica data
         for (Entry<String, Set<String>> entry : recoveryCandidates.entrySet()) {
-
             int winnerScore = -1;
             String winner = "";
             for (String node : entry.getValue()) {
@@ -214,12 +218,133 @@ public class RemoteRecoveryManager implements IRemoteRecoveryManager {
     }
 
     @Override
-    public void takeoverPartitons(String failedNode, Integer[] partitions) throws IOException, ACIDException {
-        long minLSN = runtimeContext.getReplicaResourcesManager().getPartitionsMinLSN(partitions);
-        //reply logs > minLSN that belong to these partitions
-        //TODO (mhubail) currently we assume the logs for these partitions belong to the failed node
-        //this needs to be updated once log formats are updated to include the partition id
-        runtimeContext.getTransactionSubsystem().getRecoveryManager().replayPartitionsLogs(partitions, minLSN,
-                failedNode);
+    public void takeoverPartitons(Integer[] partitions) throws IOException, ACIDException {
+        /**
+         * TODO even though the takeover is always expected to succeed,
+         * in case of any failure during the takeover, the CC should be
+         * notified that the takeover failed.
+         */
+        Set<Integer> partitionsToTakeover = new HashSet<>(Arrays.asList(partitions));
+        ILogManager logManager = runtimeContext.getTransactionSubsystem().getLogManager();
+
+        long minLSN = runtimeContext.getReplicaResourcesManager().getPartitionsMinLSN(partitionsToTakeover);
+        long readableSmallestLSN = logManager.getReadableSmallestLSN();
+        if (minLSN < readableSmallestLSN) {
+            minLSN = readableSmallestLSN;
+        }
+
+        //replay logs > minLSN that belong to these partitions
+        IRecoveryManager recoveryManager = runtimeContext.getTransactionSubsystem().getRecoveryManager();
+        recoveryManager.replayPartitionsLogs(partitionsToTakeover, logManager.getLogReader(true), minLSN);
+
+        //mark these partitions as active in this node
+        PersistentLocalResourceRepository resourceRepository = (PersistentLocalResourceRepository) runtimeContext
+                .getLocalResourceRepository();
+        for (Integer patitionId : partitions) {
+            resourceRepository.addActivePartition(patitionId);
+        }
+    }
+
+    @Override
+    public void startFailbackProcess() {
+        int maxRecoveryAttempts = replicationProperties.getMaxRemoteRecoveryAttempts();
+        PersistentLocalResourceRepository resourceRepository = (PersistentLocalResourceRepository) runtimeContext
+                .getLocalResourceRepository();
+        IDatasetLifecycleManager datasetLifeCycleManager = runtimeContext.getDatasetLifecycleManager();
+
+        failbackRecoveryReplicas = new HashMap<>();
+        while (true) {
+            //start recovery steps
+            try {
+                if (maxRecoveryAttempts <= 0) {
+                    //to avoid infinite loop in case of unexpected behavior.
+                    throw new IllegalStateException("Failed to perform remote recovery.");
+                }
+
+                /*** Prepare for Recovery ***/
+                //1. check remote replicas states
+                replicationManager.initializeReplicasState();
+                int activeReplicasCount = replicationManager.getActiveReplicasCount();
+
+                if (activeReplicasCount == 0) {
+                    throw new IllegalStateException("no ACTIVE remote replica(s) exists to perform remote recovery");
+                }
+
+                //2. clean any memory data that could've existed from previous failed recovery attempt
+                datasetLifeCycleManager.closeAllDatasets();
+
+                //3. remove any existing storage data and initialize storage metadata
+                resourceRepository.deleteStorageData(true);
+                resourceRepository.initializeNewUniverse(AsterixClusterProperties.INSTANCE.getStorageDirectoryName());
+
+                //4. select remote replicas to recover from per lost replica data
+                failbackRecoveryReplicas = constructRemoteRecoveryPlan();
+
+                /*** Start Recovery Per Lost Replica ***/
+                for (Entry<String, Set<String>> remoteReplica : failbackRecoveryReplicas.entrySet()) {
+                    String replicaId = remoteReplica.getKey();
+                    Set<String> partitionsToRecover = remoteReplica.getValue();
+
+                    //1. Request indexes metadata and LSM components
+                    replicationManager.requestReplicaFiles(replicaId, partitionsToRecover, new HashSet<String>());
+                }
+                break;
+            } catch (IOException e) {
+                e.printStackTrace();
+                LOGGER.log(Level.WARNING, "Failed during remote recovery. Attempting again...");
+                maxRecoveryAttempts--;
+            }
+        }
+    }
+
+    @Override
+    public void completeFailbackProcess() throws IOException {
+        ILogManager logManager = runtimeContext.getTransactionSubsystem().getLogManager();
+        ReplicaResourcesManager replicaResourcesManager = (ReplicaResourcesManager) runtimeContext
+                .getReplicaResourcesManager();
+        Map<String, ClusterPartition[]> nodePartitions = ((IAsterixPropertiesProvider) runtimeContext)
+                .getMetadataProperties().getNodePartitions();
+
+        /**
+         * for each lost partition, get the remaining files from replicas
+         * to complete the failback process.
+         */
+        try {
+            for (Entry<String, Set<String>> remoteReplica : failbackRecoveryReplicas.entrySet()) {
+                String replicaId = remoteReplica.getKey();
+                Set<String> NCsDataToRecover = remoteReplica.getValue();
+                Set<String> existingFiles = new HashSet<>();
+                for (String nodeId : NCsDataToRecover) {
+                    //get partitions that will be recovered from this node
+                    ClusterPartition[] replicaPartitions = nodePartitions.get(nodeId);
+                    for (ClusterPartition partition : replicaPartitions) {
+                        existingFiles.addAll(
+                                replicaResourcesManager.getPartitionIndexesFiles(partition.getPartitionId(), true));
+                    }
+                }
+
+                //Request remaining indexes files
+                replicationManager.requestReplicaFiles(replicaId, NCsDataToRecover, existingFiles);
+            }
+        } catch (IOException e) {
+            /**
+             * in case of failure during failback completion process we need to construct a new plan
+             * and get all the files from the start since the remote replicas will change in the new plan.
+             */
+            e.printStackTrace();
+            startFailbackProcess();
+        }
+
+        //get max LSN from selected remote replicas
+        long maxRemoteLSN = replicationManager.getMaxRemoteLSN(failbackRecoveryReplicas.keySet());
+
+        //6. force LogManager to start from a partition > maxLSN in selected remote replicas
+        logManager.renewLogFilesAndStartFromLSN(maxRemoteLSN);
+
+        //start replication service after failback completed
+        runtimeContext.getReplicationChannel().start();
+        runtimeContext.getReplicationManager().startReplicationThreads();
+
+        failbackRecoveryReplicas = null;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
index 890d3a2..031aeb6 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
@@ -69,11 +69,7 @@ public class LSMIndexFileProperties {
         this.fileName = tokens[arraySize - 1];
         this.idxName = tokens[arraySize - 2];
         this.dataverse = tokens[arraySize - 3];
-        this.partition = getPartitonNumFromName(tokens[arraySize - 4]);
-    }
-
-    private static int getPartitonNumFromName(String name) {
-        return Integer.parseInt(name.substring(StoragePathUtil.PARTITION_DIR_PREFIX.length()));
+        this.partition = StoragePathUtil.getPartitonNumFromName(tokens[arraySize - 4]);
     }
 
     public void serialize(OutputStream out) throws IOException {
@@ -114,10 +110,6 @@ public class LSMIndexFileProperties {
         return nodeId;
     }
 
-    public void setNodeId(String nodeId) {
-        this.nodeId = nodeId;
-    }
-
     public String getDataverse() {
         return dataverse;
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
index b9f7506..55d442d 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
@@ -34,7 +34,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -55,7 +54,6 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager {
     public final static String LSM_COMPONENT_MASK_SUFFIX = "_mask";
     private final static String REPLICA_INDEX_LSN_MAP_NAME = ".LSN_MAP";
     public static final long REPLICA_INDEX_CREATION_LSN = -1;
-    private final AtomicLong lastMinRemoteLSN;
     private final PersistentLocalResourceRepository localRepository;
     private final Map<String, ClusterPartition[]> nodePartitions;
 
@@ -63,7 +61,6 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager {
             AsterixMetadataProperties metadataProperties) {
         this.localRepository = (PersistentLocalResourceRepository) localRepository;
         nodePartitions = metadataProperties.getNodePartitions();
-        lastMinRemoteLSN = new AtomicLong(-1);
     }
 
     public void deleteIndexFile(LSMIndexFileProperties afp) {
@@ -126,7 +123,6 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager {
 
         //update map on disk
         updateReplicaIndexLSNMap(lsmComponentProperties.getReplicaComponentPath(this), lsnMap);
-
     }
 
     public Set<File> getReplicaIndexes(String replicaId) {
@@ -139,35 +135,7 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager {
     }
 
     @Override
-    public long getMinRemoteLSN(Set<String> replicaIds) {
-        if (lastMinRemoteLSN.get() != -1) {
-            return lastMinRemoteLSN.get();
-        }
-        long minRemoteLSN = Long.MAX_VALUE;
-        for (String replica : replicaIds) {
-            //for every index in replica
-            Set<File> remoteIndexes = getReplicaIndexes(replica);
-            for (File indexFolder : remoteIndexes) {
-                //read LSN map
-                try {
-                    //get max LSN per index
-                    long remoteIndexMaxLSN = getReplicaIndexMaxLSN(indexFolder);
-
-                    //get min of all maximums
-                    minRemoteLSN = Math.min(minRemoteLSN, remoteIndexMaxLSN);
-                } catch (IOException e) {
-                    LOGGER.log(Level.INFO,
-                            indexFolder.getAbsolutePath() + " Couldn't read LSN map for index " + indexFolder);
-                    continue;
-                }
-            }
-        }
-        lastMinRemoteLSN.set(minRemoteLSN);
-        return minRemoteLSN;
-    }
-
-    @Override
-    public long getPartitionsMinLSN(Integer[] partitions) {
+    public long getPartitionsMinLSN(Set<Integer> partitions) {
         long minRemoteLSN = Long.MAX_VALUE;
         for (Integer partition : partitions) {
             //for every index in replica
@@ -219,7 +187,6 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager {
                 remoteIndexMaxLSN = Math.max(remoteIndexMaxLSN, lsn);
             }
         }
-
         return remoteIndexMaxLSN;
     }
 
@@ -271,7 +238,6 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager {
                 ObjectOutputStream oosToFos = new ObjectOutputStream(fos)) {
             oosToFos.writeObject(lsnMap);
             oosToFos.flush();
-            lastMinRemoteLSN.set(-1);
         }
     }
 
@@ -293,7 +259,9 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager {
                         File[] indexFileList = dataverseFile.listFiles();
                         if (indexFileList != null) {
                             for (File indexFile : indexFileList) {
-                                partitionIndexes.add(indexFile);
+                                if (indexFile.isDirectory()) {
+                                    partitionIndexes.add(indexFile);
+                                }
                             }
                         }
                     }
@@ -307,7 +275,7 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager {
      * @param partition
      * @return Absolute paths to all partition files
      */
-    public List<String> getPartitionIndexesFiles(int partition) {
+    public List<String> getPartitionIndexesFiles(int partition, boolean relativePath) {
         List<String> partitionFiles = new ArrayList<String>();
         Set<File> partitionIndexes = getPartitionIndexes(partition);
         for (File indexDir : partitionIndexes) {
@@ -315,7 +283,12 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager {
                 File[] indexFiles = indexDir.listFiles(LSM_INDEX_FILES_FILTER);
                 if (indexFiles != null) {
                     for (File file : indexFiles) {
-                        partitionFiles.add(file.getAbsolutePath());
+                        if (!relativePath) {
+                            partitionFiles.add(file.getAbsolutePath());
+                        } else {
+                            partitionFiles.add(
+                                    PersistentLocalResourceRepository.getResourceRelativePath(file.getAbsolutePath()));
+                        }
                     }
                 }
             }
@@ -324,18 +297,21 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager {
     }
 
     private static final FilenameFilter LSM_COMPONENTS_MASKS_FILTER = new FilenameFilter() {
+        @Override
         public boolean accept(File dir, String name) {
             return name.endsWith(LSM_COMPONENT_MASK_SUFFIX);
         }
     };
 
     private static final FilenameFilter LSM_COMPONENTS_NON_MASKS_FILTER = new FilenameFilter() {
+        @Override
         public boolean accept(File dir, String name) {
             return !name.endsWith(LSM_COMPONENT_MASK_SUFFIX);
         }
     };
 
     private static final FilenameFilter LSM_INDEX_FILES_FILTER = new FilenameFilter() {
+        @Override
         public boolean accept(File dir, String name) {
             return name.equalsIgnoreCase(PersistentLocalResourceRepository.METADATA_FILE_NAME) || !name.startsWith(".");
         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
index f35f4d6..97683d5 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
@@ -75,7 +75,7 @@ public class AsterixLSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertU
 
     public AsterixLSMPrimaryUpsertOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
             int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, int numOfPrimaryKeys,
-            ARecordType recordType, int filterFieldIndex) throws HyracksDataException {
+            ARecordType recordType, int filterFieldIndex) {
         super(opDesc, ctx, partition, fieldPermutation, recordDescProvider, IndexOperation.UPSERT);
         // initialize nullWriter
         this.nullWriter = opDesc.getNullWriterFactory().createNullWriter();
@@ -125,7 +125,8 @@ public class AsterixLSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertU
             dos = tb.getDataOutput();
             appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
             modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
-                    indexHelper.getResourcePath(), indexHelper.getResourceID(), lsmIndex, ctx);
+                    indexHelper.getResourcePath(), indexHelper.getResourceID(), indexHelper.getResourcePartition(),
+                    lsmIndex, ctx);
 
             indexAccessor = lsmIndex.createAccessor(modCallback, opDesc.getSearchOpCallbackFactory()
                     .createSearchOperationCallback(indexHelper.getResourceID(), ctx));

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
index 3b5630f..65c9a49 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
@@ -45,7 +45,7 @@ public abstract class AbstractIndexModificationOperationCallback extends Abstrac
 
     protected AbstractIndexModificationOperationCallback(int datasetId, int[] primaryKeyFields,
             ITransactionContext txnCtx, ILockManager lockManager, ITransactionSubsystem txnSubsystem, long resourceId,
-            byte resourceType, IndexOperation indexOp) {
+            int resourcePartition, byte resourceType, IndexOperation indexOp) {
         super(datasetId, primaryKeyFields, txnCtx, lockManager);
         this.resourceId = resourceId;
         this.resourceType = resourceType;
@@ -58,8 +58,8 @@ public abstract class AbstractIndexModificationOperationCallback extends Abstrac
         logRecord.setJobId(txnCtx.getJobId().getId());
         logRecord.setDatasetId(datasetId);
         logRecord.setResourceId(resourceId);
+        logRecord.setResourcePartition(resourcePartition);
         logRecord.setNewOp((byte) (indexOp.ordinal()));
-        logRecord.setNodeId(txnSubsystem.getId());
     }
 
     protected void log(int PKHash, ITupleReference newValue) throws ACIDException {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
index 3c34153..780f294 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
@@ -37,9 +37,10 @@ public class PrimaryIndexModificationOperationCallback extends AbstractIndexModi
         implements IModificationOperationCallback {
 
     public PrimaryIndexModificationOperationCallback(int datasetId, int[] primaryKeyFields, ITransactionContext txnCtx,
-            ILockManager lockManager, ITransactionSubsystem txnSubsystem, long resourceId, byte resourceType,
-            IndexOperation indexOp) {
-        super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem, resourceId, resourceType, indexOp);
+            ILockManager lockManager, ITransactionSubsystem txnSubsystem, long resourceId, int resourcePartition,
+            byte resourceType, IndexOperation indexOp) {
+        super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem, resourceId, resourcePartition,
+                resourceType, indexOp);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index 794f867..db68b26 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -37,8 +37,8 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 /**
  * Assumes LSM-BTrees as primary indexes.
  */
-public class PrimaryIndexModificationOperationCallbackFactory extends AbstractOperationCallbackFactory implements
-        IModificationOperationCallbackFactory {
+public class PrimaryIndexModificationOperationCallbackFactory extends AbstractOperationCallbackFactory
+        implements IModificationOperationCallbackFactory {
 
     private static final long serialVersionUID = 1L;
     private final IndexOperation indexOp;
@@ -51,7 +51,7 @@ public class PrimaryIndexModificationOperationCallbackFactory extends AbstractOp
 
     @Override
     public IModificationOperationCallback createModificationOperationCallback(String resourcePath, long resourceId,
-            Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
+            int resourcePartition, Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
 
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
@@ -64,8 +64,8 @@ public class PrimaryIndexModificationOperationCallbackFactory extends AbstractOp
         try {
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
             IModificationOperationCallback modCallback = new PrimaryIndexModificationOperationCallback(datasetId,
-                    primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resourceId, resourceType,
-                    indexOp);
+                    primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resourceId,
+                    resourcePartition, resourceType, indexOp);
             txnCtx.registerIndexAndCallback(resourceId, index, (AbstractOperationCallback) modCallback, true);
             return modCallback;
         } catch (ACIDException e) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
index 250e28d..8044d90 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
@@ -40,8 +40,9 @@ public class SecondaryIndexModificationOperationCallback extends AbstractIndexMo
 
     public SecondaryIndexModificationOperationCallback(int datasetId, int[] primaryKeyFields,
             ITransactionContext txnCtx, ILockManager lockManager, ITransactionSubsystem txnSubsystem, long resourceId,
-            byte resourceType, IndexOperation indexOp) {
-        super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem, resourceId, resourceType, indexOp);
+            int resourcePartition, byte resourceType, IndexOperation indexOp) {
+        super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem, resourceId, resourcePartition,
+                resourceType, indexOp);
         oldOp = (indexOp == IndexOperation.DELETE) ? IndexOperation.INSERT : IndexOperation.DELETE;
     }
 


Mime
View raw message