asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mhub...@apache.org
Subject [1/6] incubator-asterixdb git commit: Introducing Data Replication To AsterixDB
Date Thu, 12 Nov 2015 20:51:28 GMT
Repository: incubator-asterixdb
Updated Branches:
  refs/heads/master 7c7e85690 -> 209f39075


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/209f3907/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
index 9bc4c53..f602156 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -49,8 +49,11 @@ import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.ILocalResourceMetadata;
+import org.apache.asterix.common.config.IAsterixPropertiesProvider;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
+import org.apache.asterix.common.replication.IReplicaResourcesManager;
+import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
 import org.apache.asterix.common.transactions.ILogReader;
@@ -92,6 +95,8 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent
{
     private final LogManager logMgr;
     private final int checkpointHistory;
     private final long SHARP_CHECKPOINT_LSN = -1;
+    private final boolean replicationEnabled;
+    public static final long NON_SHARP_CHECKPOINT_TARGET_LSN = -1;
     private final static String RECOVERY_FILES_DIR_NAME = "recovery_temp";
     private static final long MEGABYTE = 1024L * 1024L;
     private Map<Integer, JobEntityCommits> jobId2WinnerEntitiesMap = null;
@@ -108,6 +113,9 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent
{
         this.txnSubsystem = txnSubsystem;
         this.logMgr = (LogManager) txnSubsystem.getLogManager();
         this.checkpointHistory = this.txnSubsystem.getTransactionProperties().getCheckpointHistory();
+        IAsterixPropertiesProvider propertiesProvider = (IAsterixPropertiesProvider) txnSubsystem
+                .getAsterixAppRuntimeContextProvider().getAppContext();
+        this.replicationEnabled = propertiesProvider.getReplicationProperties().isReplicationEnabled();
     }
 
     /**
@@ -133,28 +141,45 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent
{
             return state;
         }
 
-        long readableSmallestLSN = logMgr.getReadableSmallestLSN();
-        if (logMgr.getAppendLSN() == readableSmallestLSN) {
-            if (checkpointObject.getMinMCTFirstLsn() != SHARP_CHECKPOINT_LSN) {
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("[Warning] ---------------------------------------------------");
-                    LOGGER.info("[Warning] Some(or all) of transaction log files are lost.");
-                    LOGGER.info("[Warning] ---------------------------------------------------");
-                    //No choice but continuing when the log files are lost. 
-                }
+        if (replicationEnabled) {
+            if (checkpointObject.getMinMCTFirstLsn() == SHARP_CHECKPOINT_LSN) {
+                //no logs exist
+                state = SystemState.HEALTHY;
+                return state;
+            } else if (checkpointObject.getCheckpointLsn() == logMgr.getAppendLSN() &&
checkpointObject.isSharp()) {
+                //only remote logs exist
+                state = SystemState.HEALTHY;
+                return state;
+            } else {
+                //need to perform remote recovery
+                state = SystemState.CORRUPTED;
+                return state;
             }
-            state = SystemState.HEALTHY;
-            return state;
-        } else if (checkpointObject.getCheckpointLsn() == logMgr.getAppendLSN()
-                && checkpointObject.getMinMCTFirstLsn() == SHARP_CHECKPOINT_LSN)
{
-            state = SystemState.HEALTHY;
-            return state;
         } else {
-            state = SystemState.CORRUPTED;
-            return state;
+            long readableSmallestLSN = logMgr.getReadableSmallestLSN();
+            if (logMgr.getAppendLSN() == readableSmallestLSN) {
+                if (checkpointObject.getMinMCTFirstLsn() != SHARP_CHECKPOINT_LSN) {
+                    if (LOGGER.isLoggable(Level.INFO)) {
+                        LOGGER.info("[Warning] ---------------------------------------------------");
+                        LOGGER.info("[Warning] Some(or all) of transaction log files are
lost.");
+                        LOGGER.info("[Warning] ---------------------------------------------------");
+                        //No choice but continuing when the log files are lost. 
+                    }
+                }
+                state = SystemState.HEALTHY;
+                return state;
+            } else if (checkpointObject.getCheckpointLsn() == logMgr.getAppendLSN()
+                    && checkpointObject.getMinMCTFirstLsn() == SHARP_CHECKPOINT_LSN)
{
+                state = SystemState.HEALTHY;
+                return state;
+            } else {
+                state = SystemState.CORRUPTED;
+                return state;
+            }
         }
     }
 
+    //This method is used only when replication is disabled. Therefore, there is no need
to check logs node ids
     public void startRecovery(boolean synchronous) throws IOException, ACIDException {
         //delete any recovery files from previous failed recovery attempts
         deleteRecoveryTemporaryFiles();
@@ -383,6 +408,205 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent
{
     }
 
     @Override
+    public void replayRemoteLogs(ArrayList<ILogRecord> remoteLogs) throws HyracksDataException,
ACIDException {
+        int updateLogCount = 0;
+        int entityCommitLogCount = 0;
+        int jobCommitLogCount = 0;
+        int redoCount = 0;
+        int abortLogCount = 0;
+        int jobId = -1;
+
+        state = SystemState.RECOVERING;
+
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("[RecoveryMgr] starting recovery ...");
+        }
+
+        Set<Integer> winnerJobSet = new HashSet<Integer>();
+        Map<Integer, Set<TxnId>> jobId2WinnerEntitiesMap = new HashMap<Integer,
Set<TxnId>>();
+        //winnerEntity is used to add pairs, <committed TxnId, the most recent commit
Lsn of the TxnId>
+        Set<TxnId> winnerEntitySet = null;
+        TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false);
+        TxnId winnerEntity = null;
+
+        //-------------------------------------------------------------------------
+        //  [ analysis phase ]
+        //  - collect all committed Lsn 
+        //-------------------------------------------------------------------------
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("[RecoveryMgr] in analysis phase");
+        }
+
+        String nodeId = logMgr.getNodeId();
+        ILogRecord logRecord;
+        for (int i = 0; i < remoteLogs.size(); i++) {
+            logRecord = remoteLogs.get(i);
+            if (IS_DEBUG_MODE) {
+                LOGGER.info(logRecord.getLogRecordForDisplay());
+            }
+
+            if (logRecord.getNodeId().equals(nodeId)) {
+                //update max jobId
+                switch (logRecord.getLogType()) {
+                    case LogType.UPDATE:
+                        updateLogCount++;
+                        break;
+                    case LogType.JOB_COMMIT:
+                        winnerJobSet.add(Integer.valueOf(logRecord.getJobId()));
+                        jobId2WinnerEntitiesMap.remove(Integer.valueOf(logRecord.getJobId()));
+                        jobCommitLogCount++;
+                        break;
+                    case LogType.ENTITY_COMMIT:
+                        jobId = logRecord.getJobId();
+                        winnerEntity = new TxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
+                                logRecord.getPKValue(), logRecord.getPKValueSize(), true);
+                        if (!jobId2WinnerEntitiesMap.containsKey(Integer.valueOf(jobId)))
{
+                            winnerEntitySet = new HashSet<TxnId>();
+                            jobId2WinnerEntitiesMap.put(Integer.valueOf(jobId), winnerEntitySet);
+                        } else {
+                            winnerEntitySet = jobId2WinnerEntitiesMap.get(Integer.valueOf(jobId));
+                        }
+                        winnerEntitySet.add(winnerEntity);
+                        entityCommitLogCount++;
+                        break;
+                    case LogType.ABORT:
+                        abortLogCount++;
+                        break;
+                    case LogType.FLUSH:
+                        break;
+                    default:
+                        throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
+                }
+            }
+        }
+
+        //-------------------------------------------------------------------------
+        //  [ redo phase ]
+        //  - redo if
+        //    1) The TxnId is committed && --> guarantee durability
+        //    2) lsn > maxDiskLastLsn of the index --> guarantee idempotence
+        //-------------------------------------------------------------------------
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("[RecoveryMgr] in redo phase");
+        }
+
+        long resourceId;
+        long maxDiskLastLsn;
+        long LSN = -1;
+        ILSMIndex index = null;
+        LocalResource localResource = null;
+        ILocalResourceMetadata localResourceMetadata = null;
+        Map<Long, Long> resourceId2MaxLSNMap = new HashMap<Long, Long>();
+        boolean foundWinner = false;
+
+        //#. get indexLifeCycleManager 
+        IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
+        IDatasetLifecycleManager datasetLifecycleManager = appRuntimeContext.getDatasetLifecycleManager();
+        ILocalResourceRepository localResourceRepository = appRuntimeContext.getLocalResourceRepository();
+        Map<Long, LocalResource> resourcesMap = ((PersistentLocalResourceRepository)
localResourceRepository)
+                .loadAndGetAllResources();
+        //#. set log reader to the lowWaterMarkLsn again.
+        for (int i = 0; i < remoteLogs.size(); i++) {
+            logRecord = remoteLogs.get(i);
+            if (IS_DEBUG_MODE) {
+                LOGGER.info(logRecord.getLogRecordForDisplay());
+            }
+            if (logRecord.getNodeId().equals(nodeId)) {
+                LSN = logRecord.getLSN();
+                jobId = logRecord.getJobId();
+                foundWinner = false;
+                switch (logRecord.getLogType()) {
+                    case LogType.UPDATE:
+                        if (winnerJobSet.contains(Integer.valueOf(jobId))) {
+                            foundWinner = true;
+                        } else if (jobId2WinnerEntitiesMap.containsKey(Integer.valueOf(jobId)))
{
+                            winnerEntitySet = jobId2WinnerEntitiesMap.get(Integer.valueOf(jobId));
+                            tempKeyTxnId.setTxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
+                                    logRecord.getPKValue(), logRecord.getPKValueSize());
+                            if (winnerEntitySet.contains(tempKeyTxnId)) {
+                                foundWinner = true;
+                            }
+                        }
+                        if (foundWinner) {
+                            resourceId = logRecord.getResourceId();
+                            localResource = resourcesMap.get(resourceId);
+
+                            /*******************************************************************
+                             * [Notice]
+                             * -> Issue
+                             * Delete index may cause a problem during redo.
+                             * The index operation to be redone couldn't be redone because
the corresponding index
+                             * may not exist in NC due to the possible index drop DDL operation.
+                             * -> Approach
+                             * Avoid the problem during redo.
+                             * More specifically, the problem will be detected when the localResource
of
+                             * the corresponding index is retrieved, which will end up with
'null'.
+                             * If null is returned, then just go and process the next
+                             * log record.
+                             *******************************************************************/
+                            if (localResource == null) {
+                                continue;
+                            }
+                            /*******************************************************************/
+
+                            //get index instance from IndexLifeCycleManager
+                            //if index is not registered into IndexLifeCycleManager,
+                            //create the index using LocalMetadata stored in LocalResourceRepository
+                            index = (ILSMIndex) datasetLifecycleManager.getIndex(localResource.getResourceName());
+                            if (index == null) {
+                                //#. create index instance and register to indexLifeCycleManager
+                                localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject();
+                                index = localResourceMetadata.createIndexInstance(appRuntimeContext,
+                                        localResource.getResourceName(), localResource.getPartition());
+                                datasetLifecycleManager.register(localResource.getResourceName(),
index);
+                                datasetLifecycleManager.open(localResource.getResourceName());
+
+                                //#. get maxDiskLastLSN
+                                ILSMIndex lsmIndex = index;
+                                maxDiskLastLsn = ((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback())
+                                        .getComponentLSN(lsmIndex.getImmutableComponents());
+
+                                //#. set resourceId and maxDiskLastLSN to the map
+                                resourceId2MaxLSNMap.put(Long.valueOf(resourceId), Long.valueOf(maxDiskLastLsn));
+                            } else {
+                                maxDiskLastLsn = resourceId2MaxLSNMap.get(Long.valueOf(resourceId));
+                            }
+
+                            if (LSN > maxDiskLastLsn) {
+                                redo(logRecord, datasetLifecycleManager);
+                                redoCount++;
+                            }
+                        }
+                        break;
+                    case LogType.JOB_COMMIT:
+                    case LogType.ENTITY_COMMIT:
+                    case LogType.ABORT:
+                    case LogType.FLUSH:
+
+                        //do nothing
+                        break;
+
+                    default:
+                        throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
+                }
+            }
+        }
+
+        //close all indexes
+        Set<Long> resourceIdList = resourceId2MaxLSNMap.keySet();
+        for (long r : resourceIdList) {
+            datasetLifecycleManager.close(resourcesMap.get(r).getResourceName());
+        }
+
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("[RecoveryMgr] remote recovery is completed.");
+            LOGGER.info("[RecoveryMgr's recovery log count] update/entityCommit/jobCommit/abort/redo
= "
+                    + updateLogCount + "/" + entityCommitLogCount + "/" + jobCommitLogCount
+ "/" + abortLogCount + "/"
+                    + redoCount);
+        }
+    }
+
+    @Override
     public synchronized long checkpoint(boolean isSharpCheckpoint, long nonSharpCheckpointTargetLSN)
             throws ACIDException, HyracksDataException {
         long minMCTFirstLSN;
@@ -404,7 +628,26 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent
{
         //flush all in-memory components if it is the sharp checkpoint
         if (isSharpCheckpoint) {
             datasetLifecycleManager.flushAllDatasets();
-            minMCTFirstLSN = SHARP_CHECKPOINT_LSN;
+            if (!replicationEnabled) {
+                minMCTFirstLSN = SHARP_CHECKPOINT_LSN;
+            } else {
+                //if is shutting down, need to check if we need to keep any remote logs for
dead replicas
+                if (txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().isShuttingdown())
{
+                    Set<String> deadReplicaIds = txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext()
+                            .getReplicationManager().getDeadReplicasIds();
+                    if (deadReplicaIds.isEmpty()) {
+                        minMCTFirstLSN = SHARP_CHECKPOINT_LSN;
+                    } else {
+                        //get min LSN of dead replicas remote resources
+                        IReplicaResourcesManager remoteResourcesManager = txnSubsystem
+                                .getAsterixAppRuntimeContextProvider().getAppContext().getReplicaResourcesManager();
+                        minMCTFirstLSN = remoteResourcesManager.getMinRemoteLSN(deadReplicaIds);
+                    }
+                } else {
+                    //start up complete checkpoint. Avoid deleting remote recovery logs.

+                    minMCTFirstLSN = getMinFirstLSN();
+                }
+            }
         } else {
             minMCTFirstLSN = getMinFirstLSN();
             if (minMCTFirstLSN >= nonSharpCheckpointTargetLSN) {
@@ -412,11 +655,21 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent
{
             } else {
                 //flush datasets with indexes behind target checkpoint LSN
                 datasetLifecycleManager.scheduleAsyncFlushForLaggingDatasets(nonSharpCheckpointTargetLSN);
+                if (replicationEnabled) {
+                    //request remote replicas to flush lagging indexes
+                    IReplicationManager replicationManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+                            .getAppContext().getReplicationManager();
+                    try {
+                        replicationManager.requestFlushLaggingReplicaIndexes(nonSharpCheckpointTargetLSN);
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                    }
+                }
             }
         }
 
         CheckpointObject checkpointObject = new CheckpointObject(logMgr.getAppendLSN(), minMCTFirstLSN,
-                txnMgr.getMaxJobId(), System.currentTimeMillis());
+                txnMgr.getMaxJobId(), System.currentTimeMillis(), isSharpCheckpoint);
 
         FileOutputStream fos = null;
         ObjectOutputStream oosToFos = null;
@@ -456,7 +709,11 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent
{
 
         if (isSharpCheckpoint) {
             try {
-                logMgr.renewLogFiles();
+                if (minMCTFirstLSN == SHARP_CHECKPOINT_LSN) {
+                    logMgr.renewLogFiles();
+                } else {
+                    logMgr.deleteOldLogFiles(minMCTFirstLSN);
+                }
             } catch (IOException e) {
                 throw new HyracksDataException(e);
             }
@@ -475,6 +732,18 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent
{
     }
 
     public long getMinFirstLSN() throws HyracksDataException {
+        long minFirstLSN = getLocalMinFirstLSN();
+
+        //if replication is enabled, consider replica resources min LSN
+        if (replicationEnabled) {
+            long remoteMinFirstLSN = getRemoteMinFirstLSN();
+            minFirstLSN = Math.min(minFirstLSN, remoteMinFirstLSN);
+        }
+
+        return minFirstLSN;
+    }
+
+    public long getLocalMinFirstLSN() throws HyracksDataException {
         IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getDatasetLifecycleManager();
         List<IIndex> openIndexList = datasetLifecycleManager.getOpenIndexes();
@@ -485,6 +754,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent
{
             for (IIndex index : openIndexList) {
                 AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback)
((ILSMIndex) index)
                         .getIOOperationCallback();
+
                 if (!((AbstractLSMIndex) index).isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush())
{
                     firstLSN = ioCallback.getFirstLSN();
                     minFirstLSN = Math.min(minFirstLSN, firstLSN);
@@ -494,6 +764,18 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent
{
         return minFirstLSN;
     }
 
+    private long getRemoteMinFirstLSN() {
+        IAsterixPropertiesProvider propertiesProvider = (IAsterixPropertiesProvider) txnSubsystem
+                .getAsterixAppRuntimeContextProvider().getAppContext();
+
+        Set<String> replicaIds = propertiesProvider.getReplicationProperties()
+                .getRemoteReplicasIds(txnSubsystem.getId());
+        IReplicaResourcesManager remoteResourcesManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+                .getAppContext().getReplicaResourcesManager();
+
+        return remoteResourcesManager.getMinRemoteLSN(replicaIds);
+    }
+
     private CheckpointObject readCheckpoint() throws ACIDException, FileNotFoundException
{
         CheckpointObject checkpointObject = null;
 
@@ -641,7 +923,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent
{
         long currentLSN = -1;
         TxnId loserEntity = null;
         List<Long> undoLSNSet = null;
-
+        String nodeId = logMgr.getNodeId();
         ILogReader logReader = logMgr.getLogReader(false);
         try {
             logReader.initializeScan(firstLSN);
@@ -652,48 +934,51 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent
{
                     break;
                 } else {
                     currentLSN = logRecord.getLSN();
+
                     if (IS_DEBUG_MODE) {
                         LOGGER.info(logRecord.getLogRecordForDisplay());
                     }
                 }
-                logJobId = logRecord.getJobId();
-                if (logJobId != abortedJobId) {
-                    continue;
-                }
-                tempKeyTxnId.setTxnId(logJobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
-                        logRecord.getPKValue(), logRecord.getPKValueSize());
-                switch (logRecord.getLogType()) {
-                    case LogType.UPDATE:
-                        undoLSNSet = jobLoserEntity2LSNsMap.get(tempKeyTxnId);
-                        if (undoLSNSet == null) {
-                            loserEntity = new TxnId(logJobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
-                                    logRecord.getPKValue(), logRecord.getPKValueSize(), true);
-                            undoLSNSet = new LinkedList<Long>();
-                            jobLoserEntity2LSNsMap.put(loserEntity, undoLSNSet);
-                        }
-                        undoLSNSet.add(currentLSN);
-                        updateLogCount++;
-                        if (IS_DEBUG_MODE) {
-                            LOGGER.info(Thread.currentThread().getId() + "======> update["
+ currentLSN + "]:"
-                                    + tempKeyTxnId);
-                        }
-                        break;
-                    case LogType.ENTITY_COMMIT:
-                        jobLoserEntity2LSNsMap.remove(tempKeyTxnId);
-                        entityCommitLogCount++;
-                        if (IS_DEBUG_MODE) {
-                            LOGGER.info(Thread.currentThread().getId() + "======> entity_commit["
+ currentLSN + "]"
-                                    + tempKeyTxnId);
-                        }
-                        break;
-                    case LogType.JOB_COMMIT:
-                        throw new ACIDException("Unexpected LogType(" + logRecord.getLogType()
+ ") during abort.");
-                    case LogType.ABORT:
-                    case LogType.FLUSH:
-                        //ignore
-                        break;
-                    default:
-                        throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
+                if (logRecord.getNodeId().equals(nodeId)) {
+                    logJobId = logRecord.getJobId();
+                    if (logJobId != abortedJobId) {
+                        continue;
+                    }
+                    tempKeyTxnId.setTxnId(logJobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
+                            logRecord.getPKValue(), logRecord.getPKValueSize());
+                    switch (logRecord.getLogType()) {
+                        case LogType.UPDATE:
+                            undoLSNSet = jobLoserEntity2LSNsMap.get(tempKeyTxnId);
+                            if (undoLSNSet == null) {
+                                loserEntity = new TxnId(logJobId, logRecord.getDatasetId(),
logRecord.getPKHashValue(),
+                                        logRecord.getPKValue(), logRecord.getPKValueSize(),
true);
+                                undoLSNSet = new LinkedList<Long>();
+                                jobLoserEntity2LSNsMap.put(loserEntity, undoLSNSet);
+                            }
+                            undoLSNSet.add(currentLSN);
+                            updateLogCount++;
+                            if (IS_DEBUG_MODE) {
+                                LOGGER.info(Thread.currentThread().getId() + "======>
update[" + currentLSN + "]:"
+                                        + tempKeyTxnId);
+                            }
+                            break;
+                        case LogType.ENTITY_COMMIT:
+                            jobLoserEntity2LSNsMap.remove(tempKeyTxnId);
+                            entityCommitLogCount++;
+                            if (IS_DEBUG_MODE) {
+                                LOGGER.info(Thread.currentThread().getId() + "======>
entity_commit[" + currentLSN + "]"
+                                        + tempKeyTxnId);
+                            }
+                            break;
+                        case LogType.JOB_COMMIT:
+                            throw new ACIDException("Unexpected LogType(" + logRecord.getLogType()
+ ") during abort.");
+                        case LogType.ABORT:
+                        case LogType.FLUSH:
+                            //ignore
+                            break;
+                        default:
+                            throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
+                    }
                 }
             }
             if (currentLSN != lastLSN) {
@@ -743,8 +1028,13 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent
{
     }
 
     @Override
-    public void stop(boolean dumpState, OutputStream os) {
-        //no op
+    public void stop(boolean dumpState, OutputStream os) throws IOException {
+        try {
+            checkpoint(true, NON_SHARP_CHECKPOINT_TARGET_LSN);
+        } catch (HyracksDataException | ACIDException e) {
+            e.printStackTrace();
+            throw new IOException(e);
+        }
     }
 
     @Override
@@ -784,6 +1074,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent
{
                 throw new IllegalStateException("Unsupported OperationType: " + logRecord.getNewOp());
             }
         } catch (Exception e) {
+            e.printStackTrace();
             throw new IllegalStateException("Failed to redo", e);
         }
     }
@@ -1105,4 +1396,4 @@ class TxnId {
         }
         return size;
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/209f3907/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
index ecb1bc9..bc64484 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -110,6 +110,7 @@ public class TransactionContext implements ITransactionContext, Serializable
{
         primaryIndex = null;
         tempResourceIdForRegister = new MutableLong();
         logRecord = new LogRecord();
+        logRecord.setNodeId(transactionSubsystem.getId());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/209f3907/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
index eeb65e2..d371e94 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
@@ -18,7 +18,9 @@
  */
 package org.apache.asterix.transaction.management.service.transaction;
 
+import org.apache.asterix.common.config.AsterixReplicationProperties;
 import org.apache.asterix.common.config.AsterixTransactionProperties;
+import org.apache.asterix.common.config.IAsterixPropertiesProvider;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
 import org.apache.asterix.common.transactions.ILockManager;
@@ -28,6 +30,7 @@ import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.transaction.management.service.locking.ConcurrentLockManager;
 import org.apache.asterix.transaction.management.service.logging.LogManager;
+import org.apache.asterix.transaction.management.service.logging.LogManagerWithReplication;
 import org.apache.asterix.transaction.management.service.recovery.CheckpointThread;
 import org.apache.asterix.transaction.management.service.recovery.RecoveryManager;
 
@@ -52,8 +55,21 @@ public class TransactionSubsystem implements ITransactionSubsystem {
         this.txnProperties = txnProperties;
         this.transactionManager = new TransactionManager(this);
         this.lockManager = new ConcurrentLockManager(this);
-        this.logManager = new LogManager(this);
+
+        AsterixReplicationProperties asterixReplicationProperties = null;
+        if (asterixAppRuntimeContextProvider != null) {
+            asterixReplicationProperties = ((IAsterixPropertiesProvider) asterixAppRuntimeContextProvider
+                    .getAppContext()).getReplicationProperties();
+        }
+
+        if (asterixReplicationProperties != null && asterixReplicationProperties.isReplicationEnabled())
{
+            this.logManager = new LogManagerWithReplication(this);
+        } else {
+            this.logManager = new LogManager(this);
+        }
+
         this.recoveryManager = new RecoveryManager(this);
+
         if (asterixAppRuntimeContextProvider != null) {
             this.checkpointThread = new CheckpointThread(recoveryManager,
                     asterixAppRuntimeContextProvider.getDatasetLifecycleManager(),logManager,

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/209f3907/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a1a36f6..d8c7fc7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -388,6 +388,7 @@
         <module>asterix-doc</module>
         <module>asterix-fuzzyjoin</module>
         <module>asterix-yarn</module>
+        <module>asterix-replication</module>
     </modules>
 
     <repositories>


Mime
View raw message