asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mhub...@apache.org
Subject [1/6] incubator-asterixdb git commit: Asterix NCs Failback Support
Date Thu, 18 Feb 2016 09:54:27 GMT
Repository: incubator-asterixdb
Updated Branches:
  refs/heads/master c318249ef -> 98d38e6a0


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
index 11dc282..4a17541 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -49,19 +49,18 @@ import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.ILocalResourceMetadata;
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.config.AsterixMetadataProperties;
 import org.apache.asterix.common.config.IAsterixPropertiesProvider;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import org.apache.asterix.common.replication.IReplicaResourcesManager;
 import org.apache.asterix.common.replication.IReplicationManager;
-import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
 import org.apache.asterix.common.transactions.ILogReader;
 import org.apache.asterix.common.transactions.ILogRecord;
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.ITransactionContext;
-import org.apache.asterix.common.transactions.JobId;
-import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.LogType;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.transaction.management.service.logging.LogManager;
@@ -71,25 +70,21 @@ import org.apache.asterix.transaction.management.service.transaction.Transaction
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
-import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.IIndex;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
-import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.file.LocalResource;
 
 /**
  * This is the Recovery Manager and is responsible for rolling back a
- * transaction as well as doing a system recovery. TODO: Crash Recovery logic is
- * not in place completely. Once we have physical logging implemented, we would
- * add support for crash recovery.
+ * transaction as well as doing a system recovery.
  */
 public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
 
-    public static final boolean IS_DEBUG_MODE = false;//true
+    public static final boolean IS_DEBUG_MODE = false;
     private static final Logger LOGGER = Logger.getLogger(RecoveryManager.class.getName());
     private final TransactionSubsystem txnSubsystem;
     private final LogManager logMgr;
@@ -101,6 +96,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
     private static final long MEGABYTE = 1024L * 1024L;
     private Map<Integer, JobEntityCommits> jobId2WinnerEntitiesMap = null;
     private static final long MAX_CACHED_ENTITY_COMMITS_PER_JOB_SIZE = 4 * MEGABYTE; //4MB;
+    private final PersistentLocalResourceRepository localResourceRepository;
 
     /**
      * A file at a known location that contains the LSN of the last log record
@@ -111,11 +107,13 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
 
     public RecoveryManager(TransactionSubsystem txnSubsystem) {
         this.txnSubsystem = txnSubsystem;
-        this.logMgr = (LogManager) txnSubsystem.getLogManager();
-        this.checkpointHistory = this.txnSubsystem.getTransactionProperties().getCheckpointHistory();
+        logMgr = (LogManager) txnSubsystem.getLogManager();
+        checkpointHistory = txnSubsystem.getTransactionProperties().getCheckpointHistory();
         IAsterixPropertiesProvider propertiesProvider = (IAsterixPropertiesProvider) txnSubsystem
                 .getAsterixAppRuntimeContextProvider().getAppContext();
-        this.replicationEnabled = propertiesProvider.getReplicationProperties().isReplicationEnabled();
+        replicationEnabled = propertiesProvider.getReplicationProperties().isReplicationEnabled();
+        localResourceRepository = (PersistentLocalResourceRepository) txnSubsystem.getAsterixAppRuntimeContextProvider()
+                .getLocalResourceRepository();
     }
 
     /**
@@ -160,12 +158,8 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
             long readableSmallestLSN = logMgr.getReadableSmallestLSN();
             if (logMgr.getAppendLSN() == readableSmallestLSN) {
                 if (checkpointObject.getMinMCTFirstLsn() != SHARP_CHECKPOINT_LSN) {
-                    if (LOGGER.isLoggable(Level.INFO)) {
-                        LOGGER.info("[Warning] ---------------------------------------------------");
-                        LOGGER.info("[Warning] Some(or all) of transaction log files are lost.");
-                        LOGGER.info("[Warning] ---------------------------------------------------");
-                        //No choice but continuing when the log files are lost.
-                    }
+                    LOGGER.warning("Some(or all) of transaction log files are lost.");
+                    //No choice but continuing when the log files are lost.
                 }
                 state = SystemState.HEALTHY;
                 return state;
@@ -180,412 +174,227 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
         }
     }
 
-    //This method is used only when replication is disabled. Therefore, there is no need to check logs node ids
+    //This method is used only when replication is disabled.
     @Override
     public void startRecovery(boolean synchronous) throws IOException, ACIDException {
-        //delete any recovery files from previous failed recovery attempts
-        deleteRecoveryTemporaryFiles();
-
-        int updateLogCount = 0;
-        int entityCommitLogCount = 0;
-        int jobCommitLogCount = 0;
-        int redoCount = 0;
-        int abortLogCount = 0;
-        int jobId = -1;
-
         state = SystemState.RECOVERING;
-        LOGGER.log(Level.INFO, "[RecoveryMgr] starting recovery ...");
-
-        Set<Integer> winnerJobSet = new HashSet<Integer>();
-        jobId2WinnerEntitiesMap = new HashMap<>();
+        LOGGER.log(Level.INFO, "starting recovery ...");
 
-        TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false);
-        JobEntityCommits jobEntityWinners = null;
-        //#. read checkpoint file and set lowWaterMark where anaylsis and redo start
         long readableSmallestLSN = logMgr.getReadableSmallestLSN();
         CheckpointObject checkpointObject = readCheckpoint();
         long lowWaterMarkLSN = checkpointObject.getMinMCTFirstLsn();
         if (lowWaterMarkLSN < readableSmallestLSN) {
             lowWaterMarkLSN = readableSmallestLSN;
         }
-        int maxJobId = checkpointObject.getMaxJobId();
 
-        //-------------------------------------------------------------------------
-        //  [ analysis phase ]
-        //  - collect all committed Lsn
-        //-------------------------------------------------------------------------
-        LOGGER.log(Level.INFO, "[RecoveryMgr] in analysis phase");
-
-        //#. set log reader to the lowWaterMarkLsn
-        ILogReader logReader = logMgr.getLogReader(true);
-        ILogRecord logRecord = null;
-        try {
-            logReader.initializeScan(lowWaterMarkLSN);
-            logRecord = logReader.next();
-            while (logRecord != null) {
-                if (IS_DEBUG_MODE) {
-                    LOGGER.info(logRecord.getLogRecordForDisplay());
-                }
-                //update max jobId
-                if (logRecord.getJobId() > maxJobId) {
-                    maxJobId = logRecord.getJobId();
-                }
-                switch (logRecord.getLogType()) {
-                    case LogType.UPDATE:
-                        updateLogCount++;
-                        break;
-                    case LogType.JOB_COMMIT:
-                        jobId = logRecord.getJobId();
-                        winnerJobSet.add(jobId);
-                        if (jobId2WinnerEntitiesMap.containsKey(jobId)) {
-                            jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId);
-                            //to delete any spilled files as well
-                            jobEntityWinners.clear();
-                            jobId2WinnerEntitiesMap.remove(jobId);
-                        }
-                        jobCommitLogCount++;
-                        break;
-                    case LogType.ENTITY_COMMIT:
-                    case LogType.UPSERT_ENTITY_COMMIT:
-                        jobId = logRecord.getJobId();
-                        if (!jobId2WinnerEntitiesMap.containsKey(jobId)) {
-                            jobEntityWinners = new JobEntityCommits(jobId);
-                            if (needToFreeMemory()) {
-                                //if we don't have enough memory for one more job, we will force all jobs to spill their cached entities to disk.
-                                //This could happen only when we have many jobs with small number of records and none of them have job commit.
-                                freeJobsCachedEntities(jobId);
-                            }
-                            jobId2WinnerEntitiesMap.put(jobId, jobEntityWinners);
-                        } else {
-                            jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId);
-                        }
-                        jobEntityWinners.add(logRecord);
-                        entityCommitLogCount++;
-                        break;
-                    case LogType.ABORT:
-                        abortLogCount++;
-                        break;
-                    case LogType.FLUSH:
-                        break;
-                    default:
-                        throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
-                }
-                logRecord = logReader.next();
-            }
-
-            //prepare winners for search after analysis is done to flush anything remaining in memory to disk.
-            for (JobEntityCommits winners : jobId2WinnerEntitiesMap.values()) {
-                winners.prepareForSearch();
-            }
-            //-------------------------------------------------------------------------
-            //  [ redo phase ]
-            //  - redo if
-            //    1) The TxnId is committed && --> guarantee durability
-            //    2) lsn > maxDiskLastLsn of the index --> guarantee idempotence
-            //-------------------------------------------------------------------------
-            LOGGER.info("[RecoveryMgr] in redo phase");
-
-            long resourceId;
-            long maxDiskLastLsn;
-            long LSN = -1;
-            ILSMIndex index = null;
-            LocalResource localResource = null;
-            ILocalResourceMetadata localResourceMetadata = null;
-            Map<Long, Long> resourceId2MaxLSNMap = new HashMap<Long, Long>();
-            boolean foundWinner = false;
-
-            IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
-            //get datasetLifeCycleManager
-            IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
-                    .getDatasetLifecycleManager();
-            ILocalResourceRepository localResourceRepository = appRuntimeContext.getLocalResourceRepository();
-            Map<Long, LocalResource> resourcesMap = ((PersistentLocalResourceRepository) localResourceRepository)
-                    .loadAndGetAllResources();
-            //set log reader to the lowWaterMarkLsn again.
-            logReader.initializeScan(lowWaterMarkLSN);
-            logRecord = logReader.next();
-            while (logRecord != null) {
-                if (IS_DEBUG_MODE) {
-                    LOGGER.info(logRecord.getLogRecordForDisplay());
-                }
-                LSN = logRecord.getLSN();
-                jobId = logRecord.getJobId();
-                foundWinner = false;
-                switch (logRecord.getLogType()) {
-                    case LogType.UPDATE:
-                        if (winnerJobSet.contains(jobId)) {
-                            foundWinner = true;
-                        } else if (jobId2WinnerEntitiesMap.containsKey(jobId)) {
-                            jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId);
-                            tempKeyTxnId.setTxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
-                                    logRecord.getPKValue(), logRecord.getPKValueSize());
-                            if (jobEntityWinners.containsEntityCommitForTxnId(LSN, tempKeyTxnId)) {
-                                foundWinner = true;
-                            }
-                        }
-                        if (foundWinner) {
-                            resourceId = logRecord.getResourceId();
-                            localResource = resourcesMap.get(resourceId);
-                            /*******************************************************************
-                             * [Notice]
-                             * -> Issue
-                             * Delete index may cause a problem during redo.
-                             * The index operation to be redone couldn't be redone because the corresponding index
-                             * may not exist in NC due to the possible index drop DDL operation.
-                             * -> Approach
-                             * Avoid the problem during redo.
-                             * More specifically, the problem will be detected when the localResource of
-                             * the corresponding index is retrieved, which will end up with 'null'.
-                             * If null is returned, then just go and process the next
-                             * log record.
-                             *******************************************************************/
-                            if (localResource == null) {
-                                logRecord = logReader.next();
-                                continue;
-                            }
-                            /*******************************************************************/
-
-                            //get index instance from IndexLifeCycleManager
-                            //if index is not registered into IndexLifeCycleManager,
-                            //create the index using LocalMetadata stored in LocalResourceRepository
-                            String resourceAbsolutePath = localResource.getResourcePath();
-                            index = (ILSMIndex) datasetLifecycleManager.getIndex(resourceAbsolutePath);
-                            if (index == null) {
-                                //#. create index instance and register to indexLifeCycleManager
-                                localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject();
-                                index = localResourceMetadata.createIndexInstance(appRuntimeContext,
-                                        resourceAbsolutePath, localResource.getPartition());
-                                datasetLifecycleManager.register(resourceAbsolutePath, index);
-                                datasetLifecycleManager.open(resourceAbsolutePath);
-
-                                //#. get maxDiskLastLSN
-                                ILSMIndex lsmIndex = index;
-                                maxDiskLastLsn = ((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback())
-                                        .getComponentLSN(lsmIndex.getImmutableComponents());
-
-                                //#. set resourceId and maxDiskLastLSN to the map
-                                resourceId2MaxLSNMap.put(resourceId, maxDiskLastLsn);
-                            } else {
-                                maxDiskLastLsn = resourceId2MaxLSNMap.get(resourceId);
-                            }
-
-                            if (LSN > maxDiskLastLsn) {
-                                redo(logRecord, datasetLifecycleManager);
-                                redoCount++;
-                            }
-                        }
-                        break;
-                    case LogType.JOB_COMMIT:
-                    case LogType.ENTITY_COMMIT:
-                    case LogType.ABORT:
-                    case LogType.FLUSH:
-                    case LogType.UPSERT_ENTITY_COMMIT:
-                        //do nothing
-                        break;
-                    default:
-                        throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
-                }
-                logRecord = logReader.next();
-            }
+        //delete any recovery files from previous failed recovery attempts
+        deleteRecoveryTemporaryFiles();
 
-            //close all indexes
-            Set<Long> resourceIdList = resourceId2MaxLSNMap.keySet();
-            for (long r : resourceIdList) {
-                datasetLifecycleManager.close(resourcesMap.get(r).getResourcePath());
-            }
+        //get active partitions on this node
+        Set<Integer> activePartitions = localResourceRepository.getNodeOrignalPartitions();
+        replayPartitionsLogs(activePartitions, logMgr.getLogReader(true), lowWaterMarkLSN);
+    }
 
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("[RecoveryMgr] recovery is completed.");
-                LOGGER.info("[RecoveryMgr's recovery log count] update/entityCommit/jobCommit/abort/redo = "
-                        + updateLogCount + "/" + entityCommitLogCount + "/" + jobCommitLogCount + "/" + abortLogCount
-                        + "/" + redoCount);
-            }
+    @Override
+    public synchronized void replayPartitionsLogs(Set<Integer> partitions, ILogReader logReader, long lowWaterMarkLSN)
+            throws IOException, ACIDException {
+        try {
+            Set<Integer> winnerJobSet = startRecoverysAnalysisPhase(partitions, logReader, lowWaterMarkLSN);
+            startRecoveryRedoPhase(partitions, logReader, lowWaterMarkLSN, winnerJobSet);
         } finally {
             logReader.close();
-            //delete any recovery files after completing recovery
             deleteRecoveryTemporaryFiles();
         }
     }
 
-    private static boolean needToFreeMemory() {
-        return Runtime.getRuntime().freeMemory() < MAX_CACHED_ENTITY_COMMITS_PER_JOB_SIZE;
-    }
-
-    @Override
-    public void replayRemoteLogs(ArrayList<ILogRecord> remoteLogs) throws HyracksDataException, ACIDException {
+    private synchronized Set<Integer> startRecoverysAnalysisPhase(Set<Integer> partitions, ILogReader logReader,
+            long lowWaterMarkLSN) throws IOException, ACIDException {
         int updateLogCount = 0;
         int entityCommitLogCount = 0;
         int jobCommitLogCount = 0;
-        int redoCount = 0;
         int abortLogCount = 0;
         int jobId = -1;
 
-        state = SystemState.RECOVERING;
+        Set<Integer> winnerJobSet = new HashSet<Integer>();
+        jobId2WinnerEntitiesMap = new HashMap<>();
 
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("[RecoveryMgr] starting recovery ...");
-        }
+        //set log reader to the lowWaterMarkLsn
+        ILogRecord logRecord = null;
+        logReader.initializeScan(lowWaterMarkLSN);
 
-        Set<Integer> winnerJobSet = new HashSet<Integer>();
-        Map<Integer, Set<TxnId>> jobId2WinnerEntitiesMap = new HashMap<Integer, Set<TxnId>>();
-        //winnerEntity is used to add pairs, <committed TxnId, the most recent commit Lsn of the TxnId>
-        Set<TxnId> winnerEntitySet = null;
-        TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false);
-        TxnId winnerEntity = null;
-
-        //-------------------------------------------------------------------------
-        //  [ analysis phase ]
-        //  - collect all committed Lsn
-        //-------------------------------------------------------------------------
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("[RecoveryMgr] in analysis phase");
-        }
+        //collect all committed Lsn
+        JobEntityCommits jobEntityWinners = null;
 
-        String nodeId = logMgr.getNodeId();
-        ILogRecord logRecord;
-        for (int i = 0; i < remoteLogs.size(); i++) {
-            logRecord = remoteLogs.get(i);
+        logRecord = logReader.next();
+        while (logRecord != null) {
             if (IS_DEBUG_MODE) {
                 LOGGER.info(logRecord.getLogRecordForDisplay());
             }
-
-            if (logRecord.getNodeId().equals(nodeId)) {
-                //update max jobId
-                switch (logRecord.getLogType()) {
-                    case LogType.UPDATE:
+            switch (logRecord.getLogType()) {
+                case LogType.UPDATE:
+                    if (partitions.contains(logRecord.getResourcePartition())) {
                         updateLogCount++;
-                        break;
-                    case LogType.JOB_COMMIT:
-                    case LogType.UPSERT_ENTITY_COMMIT:
-                        winnerJobSet.add(Integer.valueOf(logRecord.getJobId()));
-                        jobId2WinnerEntitiesMap.remove(Integer.valueOf(logRecord.getJobId()));
-                        jobCommitLogCount++;
-                        break;
-                    case LogType.ENTITY_COMMIT:
+                    }
+                    break;
+                case LogType.JOB_COMMIT:
+                    jobId = logRecord.getJobId();
+                    winnerJobSet.add(jobId);
+                    if (jobId2WinnerEntitiesMap.containsKey(jobId)) {
+                        jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId);
+                        //to delete any spilled files as well
+                        jobEntityWinners.clear();
+                        jobId2WinnerEntitiesMap.remove(jobId);
+                    }
+                    jobCommitLogCount++;
+                    break;
+                case LogType.ENTITY_COMMIT:
+                case LogType.UPSERT_ENTITY_COMMIT:
+                    if (partitions.contains(logRecord.getResourcePartition())) {
                         jobId = logRecord.getJobId();
-                        winnerEntity = new TxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
-                                logRecord.getPKValue(), logRecord.getPKValueSize(), true);
-                        if (!jobId2WinnerEntitiesMap.containsKey(Integer.valueOf(jobId))) {
-                            winnerEntitySet = new HashSet<TxnId>();
-                            jobId2WinnerEntitiesMap.put(Integer.valueOf(jobId), winnerEntitySet);
+                        if (!jobId2WinnerEntitiesMap.containsKey(jobId)) {
+                            jobEntityWinners = new JobEntityCommits(jobId);
+                            if (needToFreeMemory()) {
+                                //if we don't have enough memory for one more job, we will force all jobs to spill their cached entities to disk.
+                                //This could happen only when we have many jobs with small number of records and none of them have job commit.
+                                freeJobsCachedEntities(jobId);
+                            }
+                            jobId2WinnerEntitiesMap.put(jobId, jobEntityWinners);
                         } else {
-                            winnerEntitySet = jobId2WinnerEntitiesMap.get(Integer.valueOf(jobId));
+                            jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId);
                         }
-                        winnerEntitySet.add(winnerEntity);
+                        jobEntityWinners.add(logRecord);
                         entityCommitLogCount++;
-                        break;
-                    case LogType.ABORT:
-                        abortLogCount++;
-                        break;
-                    case LogType.FLUSH:
-                        break;
-                    default:
-                        throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
-                }
+                    }
+                    break;
+                case LogType.ABORT:
+                    abortLogCount++;
+                    break;
+                case LogType.FLUSH:
+                    break;
+                default:
+                    throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
             }
+            logRecord = logReader.next();
         }
 
-        //-------------------------------------------------------------------------
-        //  [ redo phase ]
-        //  - redo if
-        //    1) The TxnId is committed && --> guarantee durability
-        //    2) lsn > maxDiskLastLsn of the index --> guarantee idempotence
-        //-------------------------------------------------------------------------
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("[RecoveryMgr] in redo phase");
+        //prepare winners for search after analysis is done to flush anything remaining in memory to disk.
+        for (JobEntityCommits winners : jobId2WinnerEntitiesMap.values()) {
+            winners.prepareForSearch();
         }
 
+        LOGGER.info("Logs analysis phase completed.");
+        LOGGER.info("Analysis log count update/entityCommit/jobCommit/abort = " + updateLogCount + "/"
+                + entityCommitLogCount + "/" + jobCommitLogCount + "/" + abortLogCount);
+
+        return winnerJobSet;
+    }
+
+    private synchronized void startRecoveryRedoPhase(Set<Integer> partitions, ILogReader logReader, long lowWaterMarkLSN,
+            Set<Integer> winnerJobSet) throws IOException, ACIDException {
+        int redoCount = 0;
+        int jobId = -1;
+
         long resourceId;
         long maxDiskLastLsn;
         long LSN = -1;
         ILSMIndex index = null;
         LocalResource localResource = null;
         ILocalResourceMetadata localResourceMetadata = null;
-        Map<Long, Long> resourceId2MaxLSNMap = new HashMap<Long, Long>();
         boolean foundWinner = false;
+        JobEntityCommits jobEntityWinners = null;
 
-        //#. get indexLifeCycleManager
         IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
         IDatasetLifecycleManager datasetLifecycleManager = appRuntimeContext.getDatasetLifecycleManager();
-        PersistentLocalResourceRepository localResourceRepository = (PersistentLocalResourceRepository) appRuntimeContext
-                .getLocalResourceRepository();
+
         Map<Long, LocalResource> resourcesMap = localResourceRepository.loadAndGetAllResources();
-        //#. set log reader to the lowWaterMarkLsn again.
-        for (int i = 0; i < remoteLogs.size(); i++) {
-            logRecord = remoteLogs.get(i);
-            if (IS_DEBUG_MODE) {
-                LOGGER.info(logRecord.getLogRecordForDisplay());
-            }
-            if (logRecord.getNodeId().equals(nodeId)) {
+        Map<Long, Long> resourceId2MaxLSNMap = new HashMap<Long, Long>();
+        TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false);
+
+        ILogRecord logRecord = null;
+        try {
+            logReader.initializeScan(lowWaterMarkLSN);
+            logRecord = logReader.next();
+            while (logRecord != null) {
+                if (IS_DEBUG_MODE) {
+                    LOGGER.info(logRecord.getLogRecordForDisplay());
+                }
                 LSN = logRecord.getLSN();
                 jobId = logRecord.getJobId();
                 foundWinner = false;
                 switch (logRecord.getLogType()) {
                     case LogType.UPDATE:
-                        if (winnerJobSet.contains(Integer.valueOf(jobId))) {
-                            foundWinner = true;
-                        } else if (jobId2WinnerEntitiesMap.containsKey(Integer.valueOf(jobId))) {
-                            winnerEntitySet = jobId2WinnerEntitiesMap.get(Integer.valueOf(jobId));
-                            tempKeyTxnId.setTxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
-                                    logRecord.getPKValue(), logRecord.getPKValueSize());
-                            if (winnerEntitySet.contains(tempKeyTxnId)) {
+                        if (partitions.contains(logRecord.getResourcePartition())) {
+                            if (winnerJobSet.contains(jobId)) {
                                 foundWinner = true;
+                            } else if (jobId2WinnerEntitiesMap.containsKey(jobId)) {
+                                jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId);
+                                tempKeyTxnId.setTxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
+                                        logRecord.getPKValue(), logRecord.getPKValueSize());
+                                if (jobEntityWinners.containsEntityCommitForTxnId(LSN, tempKeyTxnId)) {
+                                    foundWinner = true;
+                                }
                             }
-                        }
-                        if (foundWinner) {
-                            resourceId = logRecord.getResourceId();
-                            localResource = resourcesMap.get(resourceId);
-
-                            /*******************************************************************
-                             * [Notice]
-                             * -> Issue
-                             * Delete index may cause a problem during redo.
-                             * The index operation to be redone couldn't be redone because the corresponding index
-                             * may not exist in NC due to the possible index drop DDL operation.
-                             * -> Approach
-                             * Avoid the problem during redo.
-                             * More specifically, the problem will be detected when the localResource of
-                             * the corresponding index is retrieved, which will end up with 'null'.
-                             * If null is returned, then just go and process the next
-                             * log record.
-                             *******************************************************************/
-                            if (localResource == null) {
-                                continue;
-                            }
-                            /*******************************************************************/
-
-                            //get index instance from IndexLifeCycleManager
-                            //if index is not registered into IndexLifeCycleManager,
-                            //create the index using LocalMetadata stored in LocalResourceRepository
-                            //get partition path in this node
-                            String partitionIODevicePath = localResourceRepository
-                                    .getPartitionPath(localResource.getPartition());
-                            String resourceAbsolutePath = partitionIODevicePath + File.separator
-                                    + localResource.getResourceName();
-                            localResource.setResourcePath(resourceAbsolutePath);
-                            index = (ILSMIndex) datasetLifecycleManager.getIndex(resourceAbsolutePath);
-                            if (index == null) {
-                                //#. create index instance and register to indexLifeCycleManager
-                                localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject();
-                                index = localResourceMetadata.createIndexInstance(appRuntimeContext,
-                                        resourceAbsolutePath, localResource.getPartition());
-                                datasetLifecycleManager.register(resourceAbsolutePath, index);
-                                datasetLifecycleManager.open(resourceAbsolutePath);
-
-                                //#. get maxDiskLastLSN
-                                ILSMIndex lsmIndex = index;
-                                maxDiskLastLsn = ((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback())
-                                        .getComponentLSN(lsmIndex.getImmutableComponents());
-
-                                //#. set resourceId and maxDiskLastLSN to the map
-                                resourceId2MaxLSNMap.put(Long.valueOf(resourceId), Long.valueOf(maxDiskLastLsn));
-                            } else {
-                                maxDiskLastLsn = resourceId2MaxLSNMap.get(Long.valueOf(resourceId));
-                            }
+                            if (foundWinner) {
+                                resourceId = logRecord.getResourceId();
+                                localResource = resourcesMap.get(resourceId);
+                                /*******************************************************************
+                                 * [Notice]
+                                 * -> Issue
+                                 * Delete index may cause a problem during redo.
+                                 * The index operation to be redone couldn't be redone because the corresponding index
+                                 * may not exist in NC due to the possible index drop DDL operation.
+                                 * -> Approach
+                                 * Avoid the problem during redo.
+                                 * More specifically, the problem will be detected when the localResource of
+                                 * the corresponding index is retrieved, which will end up with 'null'.
+                                 * If null is returned, then just go and process the next
+                                 * log record.
+                                 *******************************************************************/
+                                if (localResource == null) {
+                                    logRecord = logReader.next();
+                                    continue;
+                                }
+                                /*******************************************************************/
+
+                                //get index instance from IndexLifeCycleManager
+                                //if index is not registered into IndexLifeCycleManager,
+                                //create the index using LocalMetadata stored in LocalResourceRepository
+                                //get partition path in this node
+                                String partitionIODevicePath = localResourceRepository
+                                        .getPartitionPath(localResource.getPartition());
+                                String resourceAbsolutePath = partitionIODevicePath + File.separator
+                                        + localResource.getResourceName();
+                                localResource.setResourcePath(resourceAbsolutePath);
+                                index = (ILSMIndex) datasetLifecycleManager.getIndex(resourceAbsolutePath);
+                                if (index == null) {
+                                    //#. create index instance and register to indexLifeCycleManager
+                                    localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject();
+                                    index = localResourceMetadata.createIndexInstance(appRuntimeContext,
+                                            resourceAbsolutePath, localResource.getPartition());
+                                    datasetLifecycleManager.register(resourceAbsolutePath, index);
+                                    datasetLifecycleManager.open(resourceAbsolutePath);
 
-                            if (LSN > maxDiskLastLsn) {
-                                redo(logRecord, datasetLifecycleManager);
-                                redoCount++;
+                                    //#. get maxDiskLastLSN
+                                    ILSMIndex lsmIndex = index;
+                                    try {
+                                        maxDiskLastLsn = ((AbstractLSMIOOperationCallback) lsmIndex
+                                                .getIOOperationCallback())
+                                                        .getComponentLSN(lsmIndex.getImmutableComponents());
+                                    } catch (HyracksDataException e) {
+                                        datasetLifecycleManager.close(resourceAbsolutePath);
+                                        throw e;
+                                    }
+
+                                    //#. set resourceId and maxDiskLastLSN to the map
+                                    resourceId2MaxLSNMap.put(resourceId, maxDiskLastLsn);
+                                } else {
+                                    maxDiskLastLsn = resourceId2MaxLSNMap.get(resourceId);
+                                }
+
+                                if (LSN > maxDiskLastLsn) {
+                                    redo(logRecord, datasetLifecycleManager);
+                                    redoCount++;
+                                }
                             }
                         }
                         break;
@@ -594,28 +403,25 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
                     case LogType.ABORT:
                     case LogType.FLUSH:
                     case LogType.UPSERT_ENTITY_COMMIT:
-
                         //do nothing
                         break;
-
                     default:
                         throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
                 }
+                logRecord = logReader.next();
+            }
+            LOGGER.info("Logs REDO phase completed. Redo logs count: " + redoCount);
+        } finally {
+            //close all indexes
+            Set<Long> resourceIdList = resourceId2MaxLSNMap.keySet();
+            for (long r : resourceIdList) {
+                datasetLifecycleManager.close(resourcesMap.get(r).getResourcePath());
             }
         }
+    }
 
-        //close all indexes
-        Set<Long> resourceIdList = resourceId2MaxLSNMap.keySet();
-        for (long r : resourceIdList) {
-            datasetLifecycleManager.close(resourcesMap.get(r).getResourcePath());
-        }
-
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("[RecoveryMgr] remote recovery is completed.");
-            LOGGER.info("[RecoveryMgr's recovery log count] update/entityCommit/jobCommit/abort/redo = "
-                    + updateLogCount + "/" + entityCommitLogCount + "/" + jobCommitLogCount + "/" + abortLogCount + "/"
-                    + redoCount);
-        }
+    private static boolean needToFreeMemory() {
+        return Runtime.getRuntime().freeMemory() < MAX_CACHED_ENTITY_COMMITS_PER_JOB_SIZE;
     }
 
     @Override
@@ -653,7 +459,22 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
                         //get min LSN of dead replicas remote resources
                         IReplicaResourcesManager remoteResourcesManager = txnSubsystem
                                 .getAsterixAppRuntimeContextProvider().getAppContext().getReplicaResourcesManager();
-                        minMCTFirstLSN = remoteResourcesManager.getMinRemoteLSN(deadReplicaIds);
+                        IAsterixPropertiesProvider propertiesProvider = (IAsterixPropertiesProvider) txnSubsystem
+                                .getAsterixAppRuntimeContextProvider().getAppContext();
+                        AsterixMetadataProperties metadataProperties = propertiesProvider.getMetadataProperties();
+                        Set<Integer> deadReplicasPartitions = new HashSet<>();
+                        //get partitions of the dead replicas that are not active on this node
+                        for (String deadReplicaId : deadReplicaIds) {
+                            ClusterPartition[] nodePartitons = metadataProperties.getNodePartitions()
+                                    .get(deadReplicaId);
+                            for (ClusterPartition partition : nodePartitons) {
+                                if (!localResourceRepository.getActivePartitions()
+                                        .contains(partition.getPartitionId())) {
+                                    deadReplicasPartitions.add(partition.getPartitionId());
+                                }
+                            }
+                        }
+                        minMCTFirstLSN = remoteResourcesManager.getPartitionsMinLSN(deadReplicasPartitions);
                     }
                 } else {
                     //start up complete checkpoint. Avoid deleting remote recovery logs.
@@ -683,31 +504,14 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
         CheckpointObject checkpointObject = new CheckpointObject(logMgr.getAppendLSN(), minMCTFirstLSN,
                 txnMgr.getMaxJobId(), System.currentTimeMillis(), isSharpCheckpoint);
 
-        FileOutputStream fos = null;
-        ObjectOutputStream oosToFos = null;
-        try {
-            String fileName = getCheckpointFileName(logDir, Long.toString(checkpointObject.getTimeStamp()));
-            fos = new FileOutputStream(fileName);
-            oosToFos = new ObjectOutputStream(fos);
+        String fileName = getCheckpointFileName(logDir, Long.toString(checkpointObject.getTimeStamp()));
+
+        try (FileOutputStream fos = new FileOutputStream(fileName);
+                ObjectOutputStream oosToFos = new ObjectOutputStream(fos)) {
             oosToFos.writeObject(checkpointObject);
             oosToFos.flush();
         } catch (IOException e) {
             throw new ACIDException("Failed to checkpoint", e);
-        } finally {
-            if (oosToFos != null) {
-                try {
-                    oosToFos.close();
-                } catch (IOException e) {
-                    throw new ACIDException("Failed to checkpoint", e);
-                }
-            }
-            if (oosToFos == null && fos != null) {
-                try {
-                    fos.close();
-                } catch (IOException e) {
-                    throw new ACIDException("Failed to checkpoint", e);
-                }
-            }
         }
 
         //#. delete the previous checkpoint files
@@ -735,7 +539,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
             logMgr.deleteOldLogFiles(minMCTFirstLSN);
         }
 
-        if (isSharpCheckpoint && LOGGER.isLoggable(Level.INFO)) {
+        if (isSharpCheckpoint) {
             LOGGER.info("Completed sharp checkpoint.");
         }
 
@@ -779,15 +583,10 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
     }
 
     private long getRemoteMinFirstLSN() {
-        IAsterixPropertiesProvider propertiesProvider = (IAsterixPropertiesProvider) txnSubsystem
-                .getAsterixAppRuntimeContextProvider().getAppContext();
-
-        Set<String> replicaIds = propertiesProvider.getReplicationProperties()
-                .getRemoteReplicasIds(txnSubsystem.getId());
         IReplicaResourcesManager remoteResourcesManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getAppContext().getReplicaResourcesManager();
-
-        return remoteResourcesManager.getMinRemoteLSN(replicaIds);
+        long minRemoteLSN = remoteResourcesManager.getPartitionsMinLSN(localResourceRepository.getInactivePartitions());
+        return minRemoteLSN;
     }
 
     private CheckpointObject readCheckpoint() throws ACIDException, FileNotFoundException {
@@ -801,38 +600,19 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
 
         List<CheckpointObject> checkpointObjectList = new ArrayList<CheckpointObject>();
         for (File file : prevCheckpointFiles) {
-            FileInputStream fis = null;
-            ObjectInputStream oisFromFis = null;
-
-            try {
-                fis = new FileInputStream(file);
-                oisFromFis = new ObjectInputStream(fis);
+            try (FileInputStream fis = new FileInputStream(file);
+                    ObjectInputStream oisFromFis = new ObjectInputStream(fis)) {
                 checkpointObject = (CheckpointObject) oisFromFis.readObject();
                 checkpointObjectList.add(checkpointObject);
             } catch (Exception e) {
                 throw new ACIDException("Failed to read a checkpoint file", e);
-            } finally {
-                if (oisFromFis != null) {
-                    try {
-                        oisFromFis.close();
-                    } catch (IOException e) {
-                        throw new ACIDException("Failed to read a checkpoint file", e);
-                    }
-                }
-                if (oisFromFis == null && fis != null) {
-                    try {
-                        fis.close();
-                    } catch (IOException e) {
-                        throw new ACIDException("Failed to read a checkpoint file", e);
-                    }
-                }
             }
         }
 
-        //#. sort checkpointObjects in descending order by timeStamp to find out the most recent one.
+        //sort checkpointObjects in descending order by timeStamp to find out the most recent one.
         Collections.sort(checkpointObjectList);
 
-        //#. return the most recent one (the first one in sorted list)
+        //return the most recent one (the first one in sorted list)
         return checkpointObjectList.get(0);
     }
 
@@ -843,11 +623,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
         FilenameFilter filter = new FilenameFilter() {
             @Override
             public boolean accept(File dir, String name) {
-                if (name.contains(CHECKPOINT_FILENAME_PREFIX)) {
-                    return true;
-                } else {
-                    return false;
-                }
+                return name.contains(CHECKPOINT_FILENAME_PREFIX);
             }
         };
 
@@ -861,7 +637,8 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
         return baseDir + CHECKPOINT_FILENAME_PREFIX + suffix;
     }
 
-    private File createJobRecoveryFile(int jobId, String fileName) throws IOException {
+    @Override
+    public File createJobRecoveryFile(int jobId, String fileName) throws IOException {
         String recoveryDirPath = getRecoveryDirPath();
         Path JobRecoveryFolder = Paths.get(recoveryDirPath + File.separator + jobId);
         if (!Files.exists(JobRecoveryFolder)) {
@@ -878,12 +655,11 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
         return jobRecoveryFile;
     }
 
-    private void deleteRecoveryTemporaryFiles() throws IOException {
+    @Override
+    public void deleteRecoveryTemporaryFiles() {
         String recoveryDirPath = getRecoveryDirPath();
         Path recoveryFolderPath = Paths.get(recoveryDirPath);
-        if (Files.exists(recoveryFolderPath)) {
-            FileUtils.deleteDirectory(recoveryFolderPath.toFile());
-        }
+        FileUtils.deleteQuietly(recoveryFolderPath.toFile());
     }
 
     private String getRecoveryDirPath() {
@@ -906,11 +682,6 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
         }
     }
 
-    /**
-     * Rollback a transaction
-     *
-     * @see org.apache.transaction.management.service.recovery.IRecoveryManager# rollbackTransaction (org.apache.TransactionContext.management.service.transaction .TransactionContext)
-     */
     @Override
     public void rollbackTransaction(ITransactionContext txnContext) throws ACIDException {
         int abortedJobId = txnContext.getJobId().getId();
@@ -937,7 +708,8 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
         long currentLSN = -1;
         TxnId loserEntity = null;
         List<Long> undoLSNSet = null;
-        String nodeId = logMgr.getNodeId();
+        //get active partitions on this node
+        Set<Integer> activePartitions = localResourceRepository.getActivePartitions();
         ILogReader logReader = logMgr.getLogReader(false);
         try {
             logReader.initializeScan(firstLSN);
@@ -953,15 +725,15 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
                         LOGGER.info(logRecord.getLogRecordForDisplay());
                     }
                 }
-                if (logRecord.getNodeId().equals(nodeId)) {
-                    logJobId = logRecord.getJobId();
-                    if (logJobId != abortedJobId) {
-                        continue;
-                    }
-                    tempKeyTxnId.setTxnId(logJobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
-                            logRecord.getPKValue(), logRecord.getPKValueSize());
-                    switch (logRecord.getLogType()) {
-                        case LogType.UPDATE:
+                logJobId = logRecord.getJobId();
+                if (logJobId != abortedJobId) {
+                    continue;
+                }
+                tempKeyTxnId.setTxnId(logJobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
+                        logRecord.getPKValue(), logRecord.getPKValueSize());
+                switch (logRecord.getLogType()) {
+                    case LogType.UPDATE:
+                        if (activePartitions.contains(logRecord.getResourcePartition())) {
                             undoLSNSet = jobLoserEntity2LSNsMap.get(tempKeyTxnId);
                             if (undoLSNSet == null) {
                                 loserEntity = new TxnId(logJobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
@@ -975,27 +747,30 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
                                 LOGGER.info(Thread.currentThread().getId() + "======> update[" + currentLSN + "]:"
                                         + tempKeyTxnId);
                             }
-                            break;
-                        case LogType.ENTITY_COMMIT:
-                        case LogType.UPSERT_ENTITY_COMMIT:
+                        }
+                        break;
+                    case LogType.ENTITY_COMMIT:
+                    case LogType.UPSERT_ENTITY_COMMIT:
+                        if (activePartitions.contains(logRecord.getResourcePartition())) {
                             jobLoserEntity2LSNsMap.remove(tempKeyTxnId);
                             entityCommitLogCount++;
                             if (IS_DEBUG_MODE) {
                                 LOGGER.info(Thread.currentThread().getId() + "======> entity_commit[" + currentLSN + "]"
                                         + tempKeyTxnId);
                             }
-                            break;
-                        case LogType.JOB_COMMIT:
-                            throw new ACIDException("Unexpected LogType(" + logRecord.getLogType() + ") during abort.");
-                        case LogType.ABORT:
-                        case LogType.FLUSH:
-                            //ignore
-                            break;
-                        default:
-                            throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
-                    }
+                        }
+                        break;
+                    case LogType.JOB_COMMIT:
+                        throw new ACIDException("Unexpected LogType(" + logRecord.getLogType() + ") during abort.");
+                    case LogType.ABORT:
+                    case LogType.FLUSH:
+                        //ignore
+                        break;
+                    default:
+                        throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
                 }
             }
+
             if (currentLSN != lastLSN) {
                 throw new ACIDException("LastLSN mismatch: lastLSN(" + lastLSN + ") vs currentLSN(" + currentLSN
                         + ") during abort( " + txnContext.getJobId() + ")");
@@ -1034,6 +809,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
                 LOGGER.info("[RecoveryManager's rollback log count] update/entityCommit/undo:" + updateLogCount + "/"
                         + entityCommitLogCount + "/" + undoCount);
             }
+
         } finally {
             logReader.close();
         }
@@ -1096,242 +872,6 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
         }
     }
 
-    //TODO (mhubail) RecoveryManager has three methods that perform logs REDO based on different parameters.
-    //They need to be refactored to use partitions only once the log format includes partition id.
-    @Override
-    public synchronized void replayPartitionsLogs(Integer[] partitions, long lowWaterMarkLSN, String failedNode)
-            throws IOException, ACIDException {
-        //delete any recovery files from previous failed recovery attempts
-        deleteRecoveryTemporaryFiles();
-
-        int updateLogCount = 0;
-        int entityCommitLogCount = 0;
-        int jobCommitLogCount = 0;
-        int redoCount = 0;
-        int abortLogCount = 0;
-        int jobId = -1;
-
-        state = SystemState.RECOVERING;
-        LOGGER.log(Level.INFO, "[RecoveryMgr] starting recovery ...");
-
-        Set<Integer> winnerJobSet = new HashSet<Integer>();
-        jobId2WinnerEntitiesMap = new HashMap<>();
-
-        TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false);
-        JobEntityCommits jobEntityWinners = null;
-        //#. read checkpoint file and set lowWaterMark where anaylsis and redo start
-        long readableSmallestLSN = logMgr.getReadableSmallestLSN();
-        if (lowWaterMarkLSN < readableSmallestLSN) {
-            lowWaterMarkLSN = readableSmallestLSN;
-        }
-        //-------------------------------------------------------------------------
-        //  [ analysis phase ]
-        //  - collect all committed Lsn
-        //-------------------------------------------------------------------------
-        LOGGER.log(Level.INFO, "[RecoveryMgr] in analysis phase");
-        IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
-        //get datasetLifeCycleManager
-        IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
-                .getDatasetLifecycleManager();
-        PersistentLocalResourceRepository localResourceRepository = (PersistentLocalResourceRepository) appRuntimeContext
-                .getLocalResourceRepository();
-        Map<Long, LocalResource> resourcesMap = localResourceRepository.loadAndGetAllResources();
-        Map<Long, Long> resourceId2MaxLSNMap = new HashMap<Long, Long>();
-
-        //#. set log reader to the lowWaterMarkLsn
-        ILogReader logReader = logMgr.getLogReader(true);
-        ILogRecord logRecord = null;
-        try {
-            logReader.initializeScan(lowWaterMarkLSN);
-            logRecord = logReader.next();
-            while (logRecord != null) {
-                if (IS_DEBUG_MODE) {
-                    LOGGER.info(logRecord.getLogRecordForDisplay());
-                }
-                //TODO update this partitions once the log format is updated to include partitons
-                if (logRecord.getNodeId().equals(failedNode)) {
-                    switch (logRecord.getLogType()) {
-                        case LogType.UPDATE:
-                            updateLogCount++;
-                            break;
-                        case LogType.JOB_COMMIT:
-                            jobId = logRecord.getJobId();
-                            winnerJobSet.add(jobId);
-                            if (jobId2WinnerEntitiesMap.containsKey(jobId)) {
-                                jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId);
-                                //to delete any spilled files as well
-                                jobEntityWinners.clear();
-                                jobId2WinnerEntitiesMap.remove(jobId);
-                            }
-                            jobCommitLogCount++;
-                            break;
-                        case LogType.ENTITY_COMMIT:
-                            jobId = logRecord.getJobId();
-                            if (!jobId2WinnerEntitiesMap.containsKey(jobId)) {
-                                jobEntityWinners = new JobEntityCommits(jobId);
-                                if (needToFreeMemory()) {
-                                    //if we don't have enough memory for one more job, we will force all jobs to spill their cached entities to disk.
-                                    //This could happen only when we have many jobs with small number of records and none of them have job commit.
-                                    freeJobsCachedEntities(jobId);
-                                }
-                                jobId2WinnerEntitiesMap.put(jobId, jobEntityWinners);
-                            } else {
-                                jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId);
-                            }
-                            jobEntityWinners.add(logRecord);
-                            entityCommitLogCount++;
-                            break;
-                        case LogType.ABORT:
-                            abortLogCount++;
-                            break;
-                        case LogType.FLUSH:
-                            break;
-                        default:
-                            throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
-                    }
-                }
-                logRecord = logReader.next();
-            }
-
-            //prepare winners for search after analysis is done to flush anything remaining in memory to disk.
-            for (JobEntityCommits winners : jobId2WinnerEntitiesMap.values()) {
-                winners.prepareForSearch();
-            }
-            //-------------------------------------------------------------------------
-            //  [ redo phase ]
-            //  - redo if
-            //    1) The TxnId is committed && --> guarantee durability
-            //    2) lsn > maxDiskLastLsn of the index --> guarantee idempotence
-            //-------------------------------------------------------------------------
-            LOGGER.info("[RecoveryMgr] in redo phase");
-
-            long resourceId;
-            long maxDiskLastLsn;
-            long LSN = -1;
-            ILSMIndex index = null;
-            LocalResource localResource = null;
-            ILocalResourceMetadata localResourceMetadata = null;
-            boolean foundWinner = false;
-            //set log reader to the lowWaterMarkLsn again.
-            logReader.initializeScan(lowWaterMarkLSN);
-            logRecord = logReader.next();
-            while (logRecord != null) {
-                if (IS_DEBUG_MODE) {
-                    LOGGER.info(logRecord.getLogRecordForDisplay());
-                }
-                //TODO update this to check for partitions instead of node id once the log format is updated to include partitions
-                if (logRecord.getNodeId().equals(failedNode)) {
-                    LSN = logRecord.getLSN();
-                    jobId = logRecord.getJobId();
-                    foundWinner = false;
-                    switch (logRecord.getLogType()) {
-                        case LogType.UPDATE:
-                            if (winnerJobSet.contains(jobId)) {
-                                foundWinner = true;
-                            } else if (jobId2WinnerEntitiesMap.containsKey(jobId)) {
-                                jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId);
-                                tempKeyTxnId.setTxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
-                                        logRecord.getPKValue(), logRecord.getPKValueSize());
-                                if (jobEntityWinners.containsEntityCommitForTxnId(LSN, tempKeyTxnId)) {
-                                    foundWinner = true;
-                                }
-                            }
-                            if (foundWinner) {
-                                resourceId = logRecord.getResourceId();
-                                localResource = resourcesMap.get(resourceId);
-                                /*******************************************************************
-                                 * [Notice]
-                                 * -> Issue
-                                 * Delete index may cause a problem during redo.
-                                 * The index operation to be redone couldn't be redone because the corresponding index
-                                 * may not exist in NC due to the possible index drop DDL operation.
-                                 * -> Approach
-                                 * Avoid the problem during redo.
-                                 * More specifically, the problem will be detected when the localResource of
-                                 * the corresponding index is retrieved, which will end up with 'null'.
-                                 * If null is returned, then just go and process the next
-                                 * log record.
-                                 *******************************************************************/
-                                if (localResource == null) {
-                                    logRecord = logReader.next();
-                                    continue;
-                                }
-                                /*******************************************************************/
-
-                                //get index instance from IndexLifeCycleManager
-                                //if index is not registered into IndexLifeCycleManager,
-                                //create the index using LocalMetadata stored in LocalResourceRepository
-                                //get partition path in this node
-                                String partitionIODevicePath = localResourceRepository
-                                        .getPartitionPath(localResource.getPartition());
-                                String resourceAbsolutePath = partitionIODevicePath + File.separator
-                                        + localResource.getResourceName();
-                                localResource.setResourcePath(resourceAbsolutePath);
-                                index = (ILSMIndex) datasetLifecycleManager.getIndex(resourceAbsolutePath);
-                                if (index == null) {
-                                    //#. create index instance and register to indexLifeCycleManager
-                                    localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject();
-                                    index = localResourceMetadata.createIndexInstance(appRuntimeContext,
-                                            resourceAbsolutePath, localResource.getPartition());
-                                    datasetLifecycleManager.register(resourceAbsolutePath, index);
-                                    datasetLifecycleManager.open(resourceAbsolutePath);
-
-                                    //#. get maxDiskLastLSN
-                                    ILSMIndex lsmIndex = index;
-                                    try {
-                                        maxDiskLastLsn = ((AbstractLSMIOOperationCallback) lsmIndex
-                                                .getIOOperationCallback())
-                                                        .getComponentLSN(lsmIndex.getImmutableComponents());
-                                    } catch (HyracksDataException e) {
-                                        datasetLifecycleManager.close(resourceAbsolutePath);
-                                        throw e;
-                                    }
-
-                                    //#. set resourceId and maxDiskLastLSN to the map
-                                    resourceId2MaxLSNMap.put(resourceId, maxDiskLastLsn);
-                                } else {
-                                    maxDiskLastLsn = resourceId2MaxLSNMap.get(resourceId);
-                                }
-
-                                if (LSN > maxDiskLastLsn) {
-                                    redo(logRecord, datasetLifecycleManager);
-                                    redoCount++;
-                                }
-                            }
-                            break;
-                        case LogType.JOB_COMMIT:
-                        case LogType.ENTITY_COMMIT:
-                        case LogType.ABORT:
-                        case LogType.FLUSH:
-                            //do nothing
-                            break;
-                        default:
-                            throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
-                    }
-                }
-                logRecord = logReader.next();
-            }
-
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("[RecoveryMgr] recovery is completed.");
-                LOGGER.info("[RecoveryMgr's recovery log count] update/entityCommit/jobCommit/abort/redo = "
-                        + updateLogCount + "/" + entityCommitLogCount + "/" + jobCommitLogCount + "/" + abortLogCount
-                        + "/" + redoCount);
-            }
-        } finally {
-            logReader.close();
-
-            //close all indexes
-            Set<Long> resourceIdList = resourceId2MaxLSNMap.keySet();
-            for (long r : resourceIdList) {
-                datasetLifecycleManager.close(resourcesMap.get(r).getResourcePath());
-            }
-
-            //delete any recovery files after completing recovery
-            deleteRecoveryTemporaryFiles();
-        }
-    }
-
     private class JobEntityCommits {
         private static final String PARTITION_FILE_NAME_SEPARATOR = "_";
         private final int jobId;
@@ -1499,153 +1039,4 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
             }
         }
     }
-}
-
-class TxnId {
-    public boolean isByteArrayPKValue;
-    public int jobId;
-    public int datasetId;
-    public int pkHashValue;
-    public int pkSize;
-    public byte[] byteArrayPKValue;
-    public ITupleReference tupleReferencePKValue;
-
-    public TxnId(int jobId, int datasetId, int pkHashValue, ITupleReference pkValue, int pkSize,
-            boolean isByteArrayPKValue) {
-        this.jobId = jobId;
-        this.datasetId = datasetId;
-        this.pkHashValue = pkHashValue;
-        this.pkSize = pkSize;
-        this.isByteArrayPKValue = isByteArrayPKValue;
-        if (isByteArrayPKValue) {
-            this.byteArrayPKValue = new byte[pkSize];
-            readPKValueIntoByteArray(pkValue, pkSize, byteArrayPKValue);
-        } else {
-            this.tupleReferencePKValue = pkValue;
-        }
-    }
-
-    public TxnId() {
-    }
-
-    private static void readPKValueIntoByteArray(ITupleReference pkValue, int pkSize, byte[] byteArrayPKValue) {
-        int readOffset = pkValue.getFieldStart(0);
-        byte[] readBuffer = pkValue.getFieldData(0);
-        for (int i = 0; i < pkSize; i++) {
-            byteArrayPKValue[i] = readBuffer[readOffset + i];
-        }
-    }
-
-    public void setTxnId(int jobId, int datasetId, int pkHashValue, ITupleReference pkValue, int pkSize) {
-        this.jobId = jobId;
-        this.datasetId = datasetId;
-        this.pkHashValue = pkHashValue;
-        this.tupleReferencePKValue = pkValue;
-        this.pkSize = pkSize;
-        this.isByteArrayPKValue = false;
-    }
-
-    @Override
-    public String toString() {
-        return "[" + jobId + "," + datasetId + "," + pkHashValue + "," + pkSize + "]";
-    }
-
-    @Override
-    public int hashCode() {
-        return pkHashValue;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (o == this) {
-            return true;
-        }
-        if (!(o instanceof TxnId)) {
-            return false;
-        }
-        TxnId txnId = (TxnId) o;
-        return (txnId.pkHashValue == pkHashValue && txnId.datasetId == datasetId && txnId.jobId == jobId
-                && pkSize == txnId.pkSize && isEqualTo(txnId));
-    }
-
-    private boolean isEqualTo(TxnId txnId) {
-        if (isByteArrayPKValue && txnId.isByteArrayPKValue) {
-            return isEqual(byteArrayPKValue, txnId.byteArrayPKValue, pkSize);
-        } else if (isByteArrayPKValue && (!txnId.isByteArrayPKValue)) {
-            return isEqual(byteArrayPKValue, txnId.tupleReferencePKValue, pkSize);
-        } else if ((!isByteArrayPKValue) && txnId.isByteArrayPKValue) {
-            return isEqual(txnId.byteArrayPKValue, tupleReferencePKValue, pkSize);
-        } else {
-            return isEqual(tupleReferencePKValue, txnId.tupleReferencePKValue, pkSize);
-        }
-    }
-
-    private static boolean isEqual(byte[] a, byte[] b, int size) {
-        for (int i = 0; i < size; i++) {
-            if (a[i] != b[i]) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    private static boolean isEqual(byte[] a, ITupleReference b, int size) {
-        int readOffset = b.getFieldStart(0);
-        byte[] readBuffer = b.getFieldData(0);
-        for (int i = 0; i < size; i++) {
-            if (a[i] != readBuffer[readOffset + i]) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    private static boolean isEqual(ITupleReference a, ITupleReference b, int size) {
-        int aOffset = a.getFieldStart(0);
-        byte[] aBuffer = a.getFieldData(0);
-        int bOffset = b.getFieldStart(0);
-        byte[] bBuffer = b.getFieldData(0);
-        for (int i = 0; i < size; i++) {
-            if (aBuffer[aOffset + i] != bBuffer[bOffset + i]) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    public void serialize(ByteBuffer buffer) {
-        buffer.putInt(jobId);
-        buffer.putInt(datasetId);
-        buffer.putInt(pkHashValue);
-        buffer.putInt(pkSize);
-        buffer.put((byte) (isByteArrayPKValue ? 1 : 0));
-        if (isByteArrayPKValue) {
-            buffer.put(byteArrayPKValue);
-        }
-    }
-
-    public static TxnId deserialize(ByteBuffer buffer) {
-        TxnId txnId = new TxnId();
-        txnId.jobId = buffer.getInt();
-        txnId.datasetId = buffer.getInt();
-        txnId.pkHashValue = buffer.getInt();
-        txnId.pkSize = buffer.getInt();
-        txnId.isByteArrayPKValue = (buffer.get() == 1);
-        if (txnId.isByteArrayPKValue) {
-            byte[] byteArrayPKValue = new byte[txnId.pkSize];
-            buffer.get(byteArrayPKValue);
-            txnId.byteArrayPKValue = byteArrayPKValue;
-        }
-        return txnId;
-    }
-
-    public int getCurrentSize() {
-        //job id, dataset id, pkHashValue, arraySize, isByteArrayPKValue
-        int size = JobId.BYTES + DatasetId.BYTES + LogRecord.PKHASH_LEN + LogRecord.PKSZ_LEN + Byte.BYTES;
-        //byte arraySize
-        if (isByteArrayPKValue && byteArrayPKValue != null) {
-            size += byteArrayPKValue.length;
-        }
-        return size;
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/TxnId.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/TxnId.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/TxnId.java
new file mode 100644
index 0000000..bd4a49a
--- /dev/null
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/TxnId.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.recovery;
+
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.common.transactions.DatasetId;
+import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.common.transactions.LogRecord;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class TxnId {
+    public boolean isByteArrayPKValue;
+    public int jobId;
+    public int datasetId;
+    public int pkHashValue;
+    public int pkSize;
+    public byte[] byteArrayPKValue;
+    public ITupleReference tupleReferencePKValue;
+
+    public TxnId(int jobId, int datasetId, int pkHashValue, ITupleReference pkValue, int pkSize,
+            boolean isByteArrayPKValue) {
+        this.jobId = jobId;
+        this.datasetId = datasetId;
+        this.pkHashValue = pkHashValue;
+        this.pkSize = pkSize;
+        this.isByteArrayPKValue = isByteArrayPKValue;
+        if (isByteArrayPKValue) {
+            this.byteArrayPKValue = new byte[pkSize];
+            readPKValueIntoByteArray(pkValue, pkSize, byteArrayPKValue);
+        } else {
+            this.tupleReferencePKValue = pkValue;
+        }
+    }
+
+    public TxnId() {
+    }
+
+    private static void readPKValueIntoByteArray(ITupleReference pkValue, int pkSize, byte[] byteArrayPKValue) {
+        int readOffset = pkValue.getFieldStart(0);
+        byte[] readBuffer = pkValue.getFieldData(0);
+        for (int i = 0; i < pkSize; i++) {
+            byteArrayPKValue[i] = readBuffer[readOffset + i];
+        }
+    }
+
+    public void setTxnId(int jobId, int datasetId, int pkHashValue, ITupleReference pkValue, int pkSize) {
+        this.jobId = jobId;
+        this.datasetId = datasetId;
+        this.pkHashValue = pkHashValue;
+        this.tupleReferencePKValue = pkValue;
+        this.pkSize = pkSize;
+        this.isByteArrayPKValue = false;
+    }
+
+    @Override
+    public String toString() {
+        return "[" + jobId + "," + datasetId + "," + pkHashValue + "," + pkSize + "]";
+    }
+
+    @Override
+    public int hashCode() {
+        return pkHashValue;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == this) {
+            return true;
+        }
+        if (!(o instanceof TxnId)) {
+            return false;
+        }
+        TxnId txnId = (TxnId) o;
+        return (txnId.pkHashValue == pkHashValue && txnId.datasetId == datasetId && txnId.jobId == jobId
+                && pkSize == txnId.pkSize && isEqualTo(txnId));
+    }
+
+    private boolean isEqualTo(TxnId txnId) {
+        if (isByteArrayPKValue && txnId.isByteArrayPKValue) {
+            return isEqual(byteArrayPKValue, txnId.byteArrayPKValue, pkSize);
+        } else if (isByteArrayPKValue && (!txnId.isByteArrayPKValue)) {
+            return isEqual(byteArrayPKValue, txnId.tupleReferencePKValue, pkSize);
+        } else if ((!isByteArrayPKValue) && txnId.isByteArrayPKValue) {
+            return isEqual(txnId.byteArrayPKValue, tupleReferencePKValue, pkSize);
+        } else {
+            return isEqual(tupleReferencePKValue, txnId.tupleReferencePKValue, pkSize);
+        }
+    }
+
+    private static boolean isEqual(byte[] a, byte[] b, int size) {
+        for (int i = 0; i < size; i++) {
+            if (a[i] != b[i]) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private static boolean isEqual(byte[] a, ITupleReference b, int size) {
+        int readOffset = b.getFieldStart(0);
+        byte[] readBuffer = b.getFieldData(0);
+        for (int i = 0; i < size; i++) {
+            if (a[i] != readBuffer[readOffset + i]) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private static boolean isEqual(ITupleReference a, ITupleReference b, int size) {
+        int aOffset = a.getFieldStart(0);
+        byte[] aBuffer = a.getFieldData(0);
+        int bOffset = b.getFieldStart(0);
+        byte[] bBuffer = b.getFieldData(0);
+        for (int i = 0; i < size; i++) {
+            if (aBuffer[aOffset + i] != bBuffer[bOffset + i]) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public void serialize(ByteBuffer buffer) {
+        buffer.putInt(jobId);
+        buffer.putInt(datasetId);
+        buffer.putInt(pkHashValue);
+        buffer.putInt(pkSize);
+        buffer.put((byte) (isByteArrayPKValue ? 1 : 0));
+        if (isByteArrayPKValue) {
+            buffer.put(byteArrayPKValue);
+        }
+    }
+
+    public static TxnId deserialize(ByteBuffer buffer) {
+        TxnId txnId = new TxnId();
+        txnId.jobId = buffer.getInt();
+        txnId.datasetId = buffer.getInt();
+        txnId.pkHashValue = buffer.getInt();
+        txnId.pkSize = buffer.getInt();
+        txnId.isByteArrayPKValue = (buffer.get() == 1);
+        if (txnId.isByteArrayPKValue) {
+            byte[] byteArrayPKValue = new byte[txnId.pkSize];
+            buffer.get(byteArrayPKValue);
+            txnId.byteArrayPKValue = byteArrayPKValue;
+        }
+        return txnId;
+    }
+
+    public int getCurrentSize() {
+        //job id, dataset id, pkHashValue, arraySize, isByteArrayPKValue
+        int size = JobId.BYTES + DatasetId.BYTES + LogRecord.PKHASH_LEN + LogRecord.PKSZ_LEN + Byte.BYTES;
+        //byte arraySize
+        if (isByteArrayPKValue && byteArrayPKValue != null) {
+            size += byteArrayPKValue.length;
+        }
+        return size;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
index 47a92cb..215eb14 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -110,7 +110,6 @@ public class TransactionContext implements ITransactionContext, Serializable {
         primaryIndex = null;
         tempResourceIdForRegister = new MutableLong();
         logRecord = new LogRecord();
-        logRecord.setNodeId(transactionSubsystem.getId());
         transactorNumActiveOperations = new AtomicInteger(0);
     }
 


Mime
View raw message