asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mhub...@apache.org
Subject incubator-asterixdb git commit: ASTERIXDB-1425 & ASTERIXDB-1450: Fix LogReader random reads
Date Fri, 20 May 2016 21:20:53 GMT
Repository: incubator-asterixdb
Updated Branches:
  refs/heads/master 60e1699b4 -> 5842e472c


ASTERIXDB-1425 & ASTERIXDB-1450: Fix LogReader random reads

- Fix random reads for truncated logs (ASTERIXDB-1425).
- Fix log file partition size boundary check (ASTERIXDB-1450).
- Fix deadlock between LogReader and LogFlusher.
- Prevent checkpoints from deleting log files being accessed by rollbacks.
- Make rollbacks start from LSN = max(txnFirstLSN, minMemoryLSN).
- Make default log partition size 256MB instead of 2GB.

Change-Id: I1c75ca4a7c8fe197451126392389d4baecbd7e45
Reviewed-on: https://asterix-gerrit.ics.uci.edu/867
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <imaxon@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/commit/5842e472
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/tree/5842e472
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/diff/5842e472

Branch: refs/heads/master
Commit: 5842e472c530bcfb84a7c642d617403761b8a83b
Parents: 60e1699
Author: Murtadha Hubail <mhubail@uci.edu>
Authored: Fri May 20 13:37:06 2016 -0700
Committer: Murtadha Hubail <hubailmor@gmail.com>
Committed: Fri May 20 14:20:43 2016 -0700

----------------------------------------------------------------------
 .../config/AsterixTransactionProperties.java    |   2 +-
 .../common/transactions/ILogManager.java        |  20 ++++
 .../asterix/common/transactions/TxnLogFile.java |  66 +++++++++++
 .../management/service/logging/LogManager.java  | 110 +++++++++++++++++--
 .../management/service/logging/LogReader.java   |  97 +++++++++-------
 .../service/recovery/RecoveryManager.java       |  29 +++--
 6 files changed, 269 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5842e472/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java
index c4a5e8e..356dad3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java
@@ -32,7 +32,7 @@ public class AsterixTransactionProperties extends AbstractAsterixProperties
{
     private static final int TXN_LOG_BUFFER_PAGESIZE_DEFAULT = (128 << 10); // 128KB
 
     private static final String TXN_LOG_PARTITIONSIZE_KEY = "txn.log.partitionsize";
-    private static final long TXN_LOG_PARTITIONSIZE_DEFAULT = ((long) 2 << 30); //
2GB
+    private static final long TXN_LOG_PARTITIONSIZE_DEFAULT = StorageUtil.getSizeInBytes(256L,
StorageUnit.MEGABYTE);
 
     private static final String TXN_LOG_CHECKPOINT_LSNTHRESHOLD_KEY = "txn.log.checkpoint.lsnthreshold";
     private static final int TXN_LOG_CHECKPOINT_LSNTHRESHOLD_DEFAULT = (64 << 20);
// 64M

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5842e472/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
index cff4184..97d4897 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.common.transactions;
 
 import java.io.IOException;
+import java.nio.channels.FileChannel;
 
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.replication.IReplicationManager;
@@ -85,4 +86,23 @@ public interface ILogManager {
      */
     public int getNumLogPages();
 
+    /**
+     * Opens a file channel to the log file which contains {@code LSN}.
+     * The start position of the file channel will be at the first LSN of the file.
+     *
+     * @param LSN
+     * @return
+     * @throws IOException
+     *             if the log file does not exist.
+     */
+    public TxnLogFile getLogFile(long LSN) throws IOException;
+
+    /**
+     * Closes the log file.
+     *
+     * @param logFileRef
+     * @param fileChannel
+     * @throws IOException
+     */
+    public void closeLogFile(TxnLogFile logFileRef, FileChannel fileChannel) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5842e472/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/TxnLogFile.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/TxnLogFile.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/TxnLogFile.java
new file mode 100644
index 0000000..e535206
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/TxnLogFile.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.transactions;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+public class TxnLogFile {
+
+    private final FileChannel fileChannel;
+    private final long logFileId;
+    private final long fileBeginLSN;
+    private final ILogManager logManager;
+    private boolean open = true;
+
+    public TxnLogFile(ILogManager logManager, FileChannel fileChannel, long logFileId, long
fileBeginLSN) {
+        this.logManager = logManager;
+        this.fileChannel = fileChannel;
+        this.logFileId = logFileId;
+        this.fileBeginLSN = fileBeginLSN;
+    }
+
+    public void position(long newPosition) throws IOException {
+        fileChannel.position(newPosition);
+    }
+
+    public long size() throws IOException {
+        return fileChannel.size();
+    }
+
+    public int read(ByteBuffer readBuffer) throws IOException {
+        return fileChannel.read(readBuffer);
+    }
+
+    public long getLogFileId() {
+        return logFileId;
+    }
+
+    public synchronized void close() throws IOException {
+        if (open) {
+            logManager.closeLogFile(this, fileChannel);
+            open = false;
+        }
+    }
+
+    public long getFileBeginLSN() {
+        return fileBeginLSN;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5842e472/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index f10520b..be0435a 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -27,6 +27,7 @@ import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -47,6 +48,7 @@ import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.common.transactions.LogManagerProperties;
 import org.apache.asterix.common.transactions.LogType;
 import org.apache.asterix.common.transactions.MutableLong;
+import org.apache.asterix.common.transactions.TxnLogFile;
 import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 
@@ -74,6 +76,7 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
     private final String nodeId;
     protected LinkedBlockingQueue<ILogRecord> flushLogsQ;
     private final FlushLogsLogger flushLogsLogger;
+    private final HashMap<Long, Integer> txnLogFileId2ReaderCount = new HashMap<>();
 
     public LogManager(TransactionSubsystem txnSubsystem) {
         this.txnSubsystem = txnSubsystem;
@@ -148,7 +151,13 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
                         "Aborted job(" + txnCtx.getJobId() + ") tried to write non-abort
type log record.");
             }
         }
-        if (getLogFileOffset(appendLSN.get()) + logRecord.getLogSize() > logFileSize)
{
+
+        /**
+         * To eliminate the case where the modulo of the next appendLSN = 0 (the next
+         * appendLSN = the first LSN of the next log file), we do not allow a log to be
+         * written at the last offset of the current file.
+         */
+        if (getLogFileOffset(appendLSN.get()) + logRecord.getLogSize() >= logFileSize)
{
             prepareNextLogFile();
             appendPage.isFull(true);
             getAndInitNewPage();
@@ -194,8 +203,28 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
     }
 
     protected void prepareNextLogFile() {
+        //wait until all log records have been flushed in the current file
+        synchronized (flushLSN) {
+            while (flushLSN.get() != appendLSN.get()) {
+                //notification will come from LogBuffer.internalFlush(.)
+                try {
+                    flushLSN.wait();
+                } catch (InterruptedException e) {
+                    if (LOGGER.isLoggable(Level.SEVERE)) {
+                        LOGGER.severe("Preparing new log file was interrupted");
+                    }
+                    Thread.currentThread().interrupt();
+                }
+            }
+        }
+        //move appendLSN and flushLSN to the first LSN of the next log file
         appendLSN.addAndGet(logFileSize - getLogFileOffset(appendLSN.get()));
+        flushLSN.set(appendLSN.get());
         appendChannel = getFileChannel(appendLSN.get(), true);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Created new txn log file with id(" + getLogFileId(appendLSN.get())
+ ") starting with LSN = "
+                    + appendLSN.get());
+        }
         appendPage.isLastPage(true);
         //[Notice]
         //the current log file channel is closed if
@@ -323,15 +352,32 @@ public class LogManager implements ILogManager, ILifeCycleComponent
{
 
     @Override
     public void deleteOldLogFiles(long checkpointLSN) {
-
         Long checkpointLSNLogFileID = getLogFileId(checkpointLSN);
         List<Long> logFileIds = getLogFileIds();
         if (logFileIds != null) {
-            for (Long id : logFileIds) {
-                if (id < checkpointLSNLogFileID) {
+            //sort log files from oldest to newest
+            Collections.sort(logFileIds);
+            /**
+             * At this point, any future LogReader should read from LSN >= checkpointLSN
+             */
+            synchronized (txnLogFileId2ReaderCount) {
+                for (Long id : logFileIds) {
+                    /**
+                     * Stop deletion if:
+                     * The log file which contains the checkpointLSN has been reached.
+                     * The oldest log file being accessed by a LogReader has been reached.
+                     */
+                    if (id >= checkpointLSNLogFileID
+                            || (txnLogFileId2ReaderCount.containsKey(id) && txnLogFileId2ReaderCount.get(id)
> 0)) {
+                        break;
+                    }
+
+                    //delete old log file
                     File file = new File(getLogFilePath(id));
-                    if (!file.delete()) {
-                        throw new IllegalStateException("Failed to delete a file: " + file.getAbsolutePath());
+                    file.delete();
+                    txnLogFileId2ReaderCount.remove(id);
+                    if (LOGGER.isLoggable(Level.INFO)) {
+                        LOGGER.info("Deleted log file " + file.getAbsolutePath());
                     }
                 }
             }
@@ -365,6 +411,7 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
                 throw new IllegalStateException("Failed to close a fileChannel of a log file");
             }
         }
+        txnLogFileId2ReaderCount.clear();
         List<Long> logFileIds = getLogFileIds();
         if (logFileIds != null) {
             for (Long id : logFileIds) {
@@ -434,7 +481,7 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
         return (new File(path)).mkdir();
     }
 
-    public FileChannel getFileChannel(long lsn, boolean create) {
+    private FileChannel getFileChannel(long lsn, boolean create) {
         FileChannel newFileChannel = null;
         try {
             long fileId = getLogFileId(lsn);
@@ -496,6 +543,55 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
         return numLogPages;
     }
 
+    @Override
+    public TxnLogFile getLogFile(long LSN) throws IOException {
+        long fileId = getLogFileId(LSN);
+        String logFilePath = getLogFilePath(fileId);
+        File file = new File(logFilePath);
+        if (!file.exists()) {
+            throw new IOException("Log file with id(" + fileId + ") was not found. Requested
LSN: " + LSN);
+        }
+        RandomAccessFile raf = new RandomAccessFile(new File(logFilePath), "r");
+        FileChannel newFileChannel = raf.getChannel();
+        TxnLogFile logFile = new TxnLogFile(this, newFileChannel, fileId, fileId * logFileSize);
+        touchLogFile(fileId);
+        return logFile;
+    }
+
+    @Override
+    public void closeLogFile(TxnLogFile logFileRef, FileChannel fileChannel) throws IOException
{
+        if (!fileChannel.isOpen()) {
+            throw new IllegalStateException("File channel is not open");
+        }
+        fileChannel.close();
+        untouchLogFile(logFileRef.getLogFileId());
+    }
+
+    private void touchLogFile(long fileId) {
+        synchronized (txnLogFileId2ReaderCount) {
+            if (txnLogFileId2ReaderCount.containsKey(fileId)) {
+                txnLogFileId2ReaderCount.put(fileId, txnLogFileId2ReaderCount.get(fileId)
+ 1);
+            } else {
+                txnLogFileId2ReaderCount.put(fileId, 1);
+            }
+        }
+    }
+
+    private void untouchLogFile(long fileId) {
+        synchronized (txnLogFileId2ReaderCount) {
+            if (txnLogFileId2ReaderCount.containsKey(fileId)) {
+                int newReaderCount = txnLogFileId2ReaderCount.get(fileId) - 1;
+                if (newReaderCount < 0) {
+                    throw new IllegalStateException(
+                            "Invalid log file reader count (ID=" + fileId + ", count: " +
newReaderCount + ")");
+                }
+                txnLogFileId2ReaderCount.put(fileId, newReaderCount);
+            } else {
+                throw new IllegalStateException("Trying to close log file id(" + fileId +
") which was not opened.");
+            }
+        }
+    }
+
     /**
      * This class is used to log FLUSH logs.
      * FLUSH logs are flushed on a different thread to avoid a possible deadlock in LogBuffer
batchUnlock which calls PrimaryIndexOpeartionTracker.completeOperation

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5842e472/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
index 0b1d320..1592aba 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
@@ -20,15 +20,16 @@ package org.apache.asterix.transaction.management.service.logging;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.ILogReader;
 import org.apache.asterix.common.transactions.ILogRecord;
 import org.apache.asterix.common.transactions.ILogRecord.RecordReadStatus;
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.MutableLong;
+import org.apache.asterix.common.transactions.TxnLogFile;
 
 /**
  * NOTE: Many method calls of this class are not thread safe.
@@ -38,7 +39,7 @@ public class LogReader implements ILogReader {
 
     public static final boolean IS_DEBUG_MODE = false;//true
     private static final Logger LOGGER = Logger.getLogger(LogReader.class.getName());
-    private final LogManager logMgr;
+    private final ILogManager logMgr;
     private final long logFileSize;
     private final int logPageSize;
     private final MutableLong flushLSN;
@@ -48,14 +49,15 @@ public class LogReader implements ILogReader {
     private long readLSN;
     private long bufferBeginLSN;
     private long fileBeginLSN;
-    private FileChannel fileChannel;
+    private TxnLogFile logFile;
 
     private enum ReturnState {
         FLUSH,
         EOF
     };
 
-    public LogReader(LogManager logMgr, long logFileSize, int logPageSize, MutableLong flushLSN,
boolean isRecoveryMode) {
+    public LogReader(ILogManager logMgr, long logFileSize, int logPageSize, MutableLong flushLSN,
+            boolean isRecoveryMode) {
         this.logMgr = logMgr;
         this.logFileSize = logFileSize;
         this.logPageSize = logPageSize;
@@ -71,12 +73,13 @@ public class LogReader implements ILogReader {
         if (waitForFlushOrReturnIfEOF() == ReturnState.EOF) {
             return;
         }
-        getFileChannel();
+        getLogFile();
         fillLogReadBuffer();
     }
 
     /**
      * Get the next log record from the log file.
+     *
      * @return A deserialized log record, or null if we have reached the end of the file.
      * @throws ACIDException
      */
@@ -119,11 +122,14 @@ public class LogReader implements ILogReader {
                     continue;
                 }
                 case BAD_CHKSUM: {
-                    LOGGER.severe("Transaction log contains corrupt log records (perhaps
due to medium error). Stopping recovery early.");
+                    LOGGER.severe(
+                            "Transaction log contains corrupt log records (perhaps due to
medium error). Stopping recovery early.");
                     return null;
                 }
                 case OK:
                     break;
+                default:
+                    throw new IllegalStateException("Unexpected log read status: " + status);
 
             }
             // break the loop by default
@@ -136,14 +142,14 @@ public class LogReader implements ILogReader {
 
     private ReturnState waitForFlushOrReturnIfEOF() {
         synchronized (flushLSN) {
-            while (readLSN >= flushLSN.get()) {
+            while (readLSN > flushLSN.get()) {
                 if (isRecoveryMode) {
                     return ReturnState.EOF;
                 }
                 try {
                     if (IS_DEBUG_MODE) {
-                        LOGGER.info("waitForFlushOrReturnIfEOF()| flushLSN: " + flushLSN.get()
+ ", readLSN: "
-                                + readLSN);
+                        LOGGER.info(
+                                "waitForFlushOrReturnIfEOF()| flushLSN: " + flushLSN.get()
+ ", readLSN: " + readLSN);
                     }
                     flushLSN.wait();
                 } catch (InterruptedException e) {
@@ -156,15 +162,16 @@ public class LogReader implements ILogReader {
 
     /**
      * Continues log analysis between log file splits.
+     *
      * @return true if log continues, false if EOF
      * @throws ACIDException
      */
     private boolean refillLogReadBuffer() throws ACIDException {
         try {
-            if (readLSN % logFileSize == fileChannel.size()) {
-                fileChannel.close();
+            if (readLSN % logFileSize == logFile.size()) {
+                logFile.close();
                 readLSN += logFileSize - (readLSN % logFileSize);
-                getFileChannel();
+                getLogFile();
             }
             return fillLogReadBuffer();
         } catch (IOException e) {
@@ -174,6 +181,7 @@ public class LogReader implements ILogReader {
 
     /**
      * Fills the log buffer with data from the log file at the current position
+     *
      * @return false if EOF, true otherwise
      * @throws ACIDException
      */
@@ -183,17 +191,17 @@ public class LogReader implements ILogReader {
     }
 
     private boolean fillLogReadBuffer(int readSize, ByteBuffer readBuffer) throws ACIDException
{
-        int size=0;
-        int read=0;
+        int size = 0;
+        int read = 0;
         readBuffer.position(0);
         readBuffer.limit(readSize);
         try {
-            fileChannel.position(readLSN % logFileSize);
+            logFile.position(readLSN % logFileSize);
             //We loop here because read() may return 0, but this simply means we are waiting
on IO.
             //Therefore we want to break out only when either the buffer is full, or we reach
EOF.
-            while( size < readSize && read != -1) {
-                read = fileChannel.read(readBuffer);
-                if(read>0) {
+            while (size < readSize && read != -1) {
+                read = logFile.read(readBuffer);
+                if (read > 0) {
                     size += read;
                 }
             }
@@ -202,7 +210,7 @@ public class LogReader implements ILogReader {
         }
         readBuffer.position(0);
         readBuffer.limit(size);
-        if(size == 0 && read == -1){
+        if (size == 0 && read == -1) {
             return false; //EOF
         }
         bufferBeginLSN = readLSN;
@@ -213,38 +221,37 @@ public class LogReader implements ILogReader {
     @Override
     public ILogRecord read(long LSN) throws ACIDException {
         readLSN = LSN;
+        //wait for the log to be flushed if needed before trying to read it.
         synchronized (flushLSN) {
-            while (readLSN >= flushLSN.get()) {
+            while (readLSN > flushLSN.get()) {
                 try {
                     flushLSN.wait();
                 } catch (InterruptedException e) {
-                    //ignore
+                    Thread.currentThread().interrupt();
                 }
             }
         }
         try {
-            if (fileChannel == null) {
-                getFileChannel();
+            if (logFile == null) {
+                //get the log file which contains readLSN
+                getLogFile();
                 fillLogReadBuffer();
-            } else if (readLSN < fileBeginLSN || readLSN >= fileBeginLSN + fileChannel.size())
{
-                fileChannel.close();
-                getFileChannel();
+            } else if (readLSN < fileBeginLSN || readLSN >= fileBeginLSN + logFile.size())
{
+                //log is not in the current log file
+                logFile.close();
+                getLogFile();
                 fillLogReadBuffer();
             } else if (readLSN < bufferBeginLSN || readLSN >= bufferBeginLSN + readBuffer.limit())
{
+                //log is not in the current read buffer
                 fillLogReadBuffer();
             } else {
+                //log is either completely in the current read buffer or truncated
                 readBuffer.position((int) (readLSN - bufferBeginLSN));
             }
         } catch (IOException e) {
             throw new ACIDException(e);
         }
-        boolean hasRemaining;
-        if(readBuffer.position() == readBuffer.limit()){
-            hasRemaining = refillLogReadBuffer();
-            if(!hasRemaining){
-                throw new ACIDException("LSN is out of bounds");
-            }
-        }
+
         ByteBuffer readBuffer = this.readBuffer;
         while (true) {
             RecordReadStatus status = logRecord.readLogRecord(readBuffer);
@@ -256,14 +263,20 @@ public class LogReader implements ILogReader {
                     continue;
                 }
                 case TRUNCATED: {
-                    throw new ACIDException("LSN is out of bounds");
+                    if (!fillLogReadBuffer()) {
+                        throw new IllegalStateException(
+                                "Could not read LSN(" + LSN + ") from log file id " + logFile.getLogFileId());
+                    }
+                    //now read the complete log record
+                    continue;
                 }
                 case BAD_CHKSUM: {
                     throw new ACIDException("Log record has incorrect checksum");
                 }
                 case OK:
                     break;
-
+                default:
+                    throw new IllegalStateException("Unexpected log read status: " + status);
             }
             break;
         }
@@ -272,16 +285,20 @@ public class LogReader implements ILogReader {
         return logRecord;
     }
 
-    private void getFileChannel() throws ACIDException {
-        fileChannel = logMgr.getFileChannel(readLSN, false);
-        fileBeginLSN = readLSN;
+    private void getLogFile() throws ACIDException {
+        try {
+            logFile = logMgr.getLogFile(readLSN);
+            fileBeginLSN = logFile.getFileBeginLSN();
+        } catch (IOException e) {
+            throw new ACIDException(e);
+        }
     }
 
     @Override
     public void close() throws ACIDException {
         try {
-            if (fileChannel != null) {
-                fileChannel.close();
+            if (logFile != null) {
+                logFile.close();
             }
         } catch (IOException e) {
             throw new ACIDException(e);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5842e472/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
index 3e5c6cf..afb926b 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -689,18 +689,34 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent
{
         int abortedJobId = txnContext.getJobId().getId();
         // Obtain the first/last log record LSNs written by the Job
         long firstLSN = txnContext.getFirstLSN();
+        /**
+         * The effect of any log record with LSN below minFirstLSN has already been written
to disk and
+         * will not be rolled back. Therefore, we will set the first LSN of the job to the
maximum of
+         * minFirstLSN and the job's first LSN.
+         */
+        try {
+            long localMinFirstLSN = getLocalMinFirstLSN();
+            firstLSN = Math.max(firstLSN, localMinFirstLSN);
+        } catch (HyracksDataException e) {
+            throw new ACIDException(e);
+        }
         long lastLSN = txnContext.getLastLSN();
-
-        LOGGER.log(Level.INFO, "rollbacking transaction log records from " + firstLSN + "
to " + lastLSN);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("rollbacking transaction log records from " + firstLSN + " to " +
lastLSN);
+        }
         // check if the transaction actually wrote some logs.
-        if (firstLSN == TransactionManagementConstants.LogManagerConstants.TERMINAL_LSN)
{
-            LOGGER.log(Level.INFO,
-                    "no need to roll back as there were no operations by the transaction
" + txnContext.getJobId());
+        if (firstLSN == TransactionManagementConstants.LogManagerConstants.TERMINAL_LSN ||
firstLSN > lastLSN) {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info(
+                        "no need to roll back as there were no operations by the transaction
" + txnContext.getJobId());
+            }
             return;
         }
 
         // While reading log records from firstLsn to lastLsn, collect uncommitted txn's
Lsns
-        LOGGER.log(Level.INFO, "collecting loser transaction's LSNs from " + firstLSN + "
to " + lastLSN);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("collecting loser transaction's LSNs from " + firstLSN + " to " +
lastLSN);
+        }
 
         Map<TxnId, List<Long>> jobLoserEntity2LSNsMap = new HashMap<TxnId,
List<Long>>();
         TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false);
@@ -812,7 +828,6 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent
{
                 LOGGER.info("[RecoveryManager's rollback log count] update/entityCommit/undo:"
+ updateLogCount + "/"
                         + entityCommitLogCount + "/" + undoCount);
             }
-
         } finally {
             logReader.close();
         }


Mime
View raw message