asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ian Maxon (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: ASTERIXDB-1045: Log analysis fixes
Date Wed, 14 Oct 2015 00:58:53 GMT
Ian Maxon has submitted this change and it was merged.

Change subject: ASTERIXDB-1045: Log analysis fixes
......................................................................


ASTERIXDB-1045: Log analysis fixes

-Avoid using exceptions for control flow in LogRecord
-Rename LogPage and ilk to LogBuffer
-Busywait on read() to fill entire buffer for fillLogBuffer rather than failing
-Distinguish between log truncation and checksum corruption

TODOs:
- Log IO and parsing still happen in lock-step.
- Busywaiting for read to return something other than 0 is unfortunate

Change-Id: I1658e938eb0f199f748407361ffee4833aac661c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/289
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Young-Seok Kim <kisskys@gmail.com>
Reviewed-by: Murtadha Hubail <hubailmor@gmail.com>
---
R asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java
M asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
M asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManager.java
R asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
R asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBufferTailReader.java
M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
8 files changed, 229 insertions(+), 97 deletions(-)

Approvals:
  Young-Seok Kim: Looks good to me, but someone else must approve
  Murtadha Hubail: Looks good to me, approved
  Jenkins: Verified



diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogPage.java
b/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java
similarity index 96%
rename from asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogPage.java
rename to asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java
index bd174b5..9e28cda 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogPage.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.common.transactions;
 
-public interface ILogPage {
+public interface ILogBuffer {
 
     public void append(ILogRecord logRecord, long appendLsn);
 
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
b/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
index 90595e3..16c51fe 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
@@ -28,8 +28,15 @@
     public static final int ENTITY_COMMIT_LOG_BASE_SIZE = 25;
     public static final int UPDATE_LOG_BASE_SIZE = 54;
     public static final int FLUSH_LOG_SIZE = 17;
-    
-    public boolean readLogRecord(ByteBuffer buffer);
+
+
+    public enum RECORD_STATUS{
+        TRUNCATED,
+        BAD_CHKSUM,
+        OK
+    }
+
+    public LogRecord.RECORD_STATUS readLogRecord(ByteBuffer buffer);
 
     public void writeLogRecord(ByteBuffer buffer);
 
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
b/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
index c0b71e6..60e3097 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
@@ -83,7 +83,6 @@
     //------------- fields in a log record (end) --------------//
 
     private int PKFieldCnt;
-    private static final int CHECKSUM_SIZE = 8;
     private ITransactionContext txnCtx;
     private long LSN;
     private final AtomicBoolean isFlushed;
@@ -101,6 +100,24 @@
         readNewValue = (SimpleTupleReference) tupleWriter.createTupleReference();
         checksumGen = new CRC32();
     }
+
+    private final static int TYPE_LEN = Byte.SIZE/Byte.SIZE;
+    private final static int JID_LEN = Integer.SIZE / Byte.SIZE;
+    private final static int DSID_LEN = Integer.SIZE / Byte.SIZE;
+    private final static int PKHASH_LEN = Integer.SIZE / Byte.SIZE;
+    private final static int PKSZ_LEN = Integer.SIZE / Byte.SIZE;
+    private final static int PRVLSN_LEN = Long.SIZE / Byte.SIZE;
+    private final static int RSID_LEN = Long.SIZE / Byte.SIZE;
+    private final static int LOGRCD_SZ_LEN = Integer.SIZE / Byte.SIZE;
+    private final static int FLDCNT_LEN = Integer.SIZE / Byte.SIZE;
+    private final static int NEWOP_LEN = Byte.SIZE/Byte.SIZE;
+    private final static int NEWVALSZ_LEN = Integer.SIZE / Byte.SIZE;
+    private final static int CHKSUM_LEN = Long.SIZE / Byte.SIZE;
+
+    private final static int ALL_RECORD_HEADER_LEN = TYPE_LEN + JID_LEN;
+    private final static int ENTITYCOMMIT_UPDATE_HEADER_LEN = DSID_LEN + PKHASH_LEN + PKSZ_LEN;
+    private final static int UPDATE_LSN_HEADER = PRVLSN_LEN + RSID_LEN + LOGRCD_SZ_LEN;
+    private final static int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN + NEWVALSZ_LEN;
 
     @Override
     public void writeLogRecord(ByteBuffer buffer) {
@@ -130,7 +147,7 @@
             buffer.putInt(datasetId);
         }
         
-        checksum = generateChecksum(buffer, beginOffset, logSize - CHECKSUM_SIZE);
+        checksum = generateChecksum(buffer, beginOffset, logSize - CHKSUM_LEN);
         buffer.putLong(checksum);
     }
 
@@ -154,52 +171,79 @@
     }
 
     @Override
-    public boolean readLogRecord(ByteBuffer buffer) {
+    public RECORD_STATUS readLogRecord(ByteBuffer buffer) {
         int beginOffset = buffer.position();
-        try {
-            logType = buffer.get();
-            jobId = buffer.getInt();
-            if(logType != LogType.FLUSH)
-            {
-                if (logType == LogType.JOB_COMMIT || logType == LogType.ABORT) {
-                    datasetId = -1;
-                    PKHashValue = -1;
-                } else {
-                    datasetId = buffer.getInt();
-                    PKHashValue = buffer.getInt();
-                    PKValueSize = buffer.getInt();
-                    if (PKValueSize <= 0) {
-                        throw new IllegalStateException("Primary Key Size is less than or
equal to 0");
-                    }
-                    PKValue = readPKValue(buffer);
-                }
-                if (logType == LogType.UPDATE) {
-                    prevLSN = buffer.getLong();
-                    resourceId = buffer.getLong();
-                    logSize = buffer.getInt();
-                    fieldCnt = buffer.getInt();
-                    newOp = buffer.get();
-                    newValueSize = buffer.getInt();
-                    newValue = readTuple(buffer, readNewValue, fieldCnt, newValueSize);
-                } else {
-                    computeAndSetLogSize();
-                }
-            }
-            else{
-                computeAndSetLogSize();
-                datasetId = buffer.getInt();
-                resourceId = 0l;
-            }
-            
-            checksum = buffer.getLong();
-            if (checksum != generateChecksum(buffer, beginOffset, logSize - CHECKSUM_SIZE))
{
-                throw new IllegalStateException();
-            }
-        } catch (BufferUnderflowException e) {
+        //first we need the logtype and Job ID, if the buffer isn't that big, then no dice.
+        if(buffer.remaining() < ALL_RECORD_HEADER_LEN) {
             buffer.position(beginOffset);
-            return false;
+            return RECORD_STATUS.TRUNCATED;
         }
-        return true;
+        logType = buffer.get();
+        jobId = buffer.getInt();
+        if(logType != LogType.FLUSH)
+        {
+            if (logType == LogType.JOB_COMMIT || logType == LogType.ABORT) {
+                datasetId = -1;
+                PKHashValue = -1;
+            } else {
+                //attempt to read in the dsid, PK hash and PK length
+                if(buffer.remaining() < ENTITYCOMMIT_UPDATE_HEADER_LEN){
+                    buffer.position(beginOffset);
+                    return RECORD_STATUS.TRUNCATED;
+                }
+                datasetId = buffer.getInt();
+                PKHashValue = buffer.getInt();
+                PKValueSize = buffer.getInt();
+                //attempt to read in the PK
+                if(buffer.remaining() < PKValueSize){
+                    buffer.position(beginOffset);
+                    return RECORD_STATUS.TRUNCATED;
+                }
+                if (PKValueSize <= 0) {
+                    throw new IllegalStateException("Primary Key Size is less than or equal
to 0");
+                }
+                PKValue = readPKValue(buffer);
+            }
+            if (logType == LogType.UPDATE) {
+                //attempt to read in the previous LSN, log size, new value size, and new
record type
+                if(buffer.remaining() <UPDATE_LSN_HEADER + UPDATE_BODY_HEADER){
+                    buffer.position(beginOffset);
+                    return RECORD_STATUS.TRUNCATED;
+                }
+                prevLSN = buffer.getLong();
+                resourceId = buffer.getLong();
+                logSize = buffer.getInt();
+                fieldCnt = buffer.getInt();
+                newOp = buffer.get();
+                newValueSize = buffer.getInt();
+                if(buffer.remaining() < newValueSize){
+                    buffer.position(beginOffset);
+                    return RECORD_STATUS.TRUNCATED;
+                }
+                newValue = readTuple(buffer, readNewValue, fieldCnt, newValueSize);
+            } else {
+                computeAndSetLogSize();
+            }
+        }
+        else{
+            computeAndSetLogSize();
+            if(buffer.remaining() < DSID_LEN){
+                buffer.position(beginOffset);
+                return RECORD_STATUS.TRUNCATED;
+            }
+            datasetId = buffer.getInt();
+            resourceId = 0l;
+        }
+        //atempt to read checksum
+        if(buffer.remaining() < CHKSUM_LEN){
+            buffer.position(beginOffset);
+            return RECORD_STATUS.TRUNCATED;
+        }
+        checksum = buffer.getLong();
+        if (checksum != generateChecksum(buffer, beginOffset, logSize - CHKSUM_LEN)) {
+            return RECORD_STATUS.BAD_CHKSUM;
+        }
+        return RECORD_STATUS.OK;
     }
 
     private ITupleReference readPKValue(ByteBuffer buffer) {
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManager.java
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManager.java
index 6f36a6a..ee9fd84 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManager.java
@@ -37,8 +37,8 @@
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.LogType;
-import org.apache.asterix.transaction.management.service.logging.LogPage;
-import org.apache.asterix.transaction.management.service.logging.LogPageReader;
+import org.apache.asterix.transaction.management.service.logging.LogBuffer;
+import org.apache.asterix.transaction.management.service.logging.LogBufferTailReader;
 import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
 import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
@@ -2204,11 +2204,11 @@
         }
     }
 
-    public void batchUnlock(LogPage logPage, LogPageReader logPageReader) throws ACIDException
{
+    public void batchUnlock(LogBuffer logPage, LogBufferTailReader logBufferTailReader) throws
ACIDException {
         latchLockTable();
         try {
             ITransactionContext txnCtx = null;
-            LogRecord logRecord = logPageReader.next();
+            LogRecord logRecord = logBufferTailReader.next();
             while (logRecord != null) {
                 if (logRecord.getLogType() == LogType.ENTITY_COMMIT) {
                     tempDatasetIdObj.setId(logRecord.getDatasetId());
@@ -2222,7 +2222,7 @@
                     txnCtx.notifyOptracker(true);
                     logPage.notifyJobTerminator();
                 }
-                logRecord = logPageReader.next();
+                logRecord = logBufferTailReader.next();
             }
         } finally {
             unlatchLockTable();
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogPage.java
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
similarity index 93%
rename from asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogPage.java
rename to asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index 53e17d4..4d50294 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogPage.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -28,7 +28,7 @@
 import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.DatasetId;
-import org.apache.asterix.common.transactions.ILogPage;
+import org.apache.asterix.common.transactions.ILogBuffer;
 import org.apache.asterix.common.transactions.ILogRecord;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.JobId;
@@ -39,12 +39,12 @@
 import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-public class LogPage implements ILogPage {
+public class LogBuffer implements ILogBuffer {
 
     public static final boolean IS_DEBUG_MODE = false;//true
-    private static final Logger LOGGER = Logger.getLogger(LogPage.class.getName());
+    private static final Logger LOGGER = Logger.getLogger(LogBuffer.class.getName());
     private final TransactionSubsystem txnSubsystem;
-    private final LogPageReader logPageReader;
+    private final LogBufferTailReader logBufferTailReader;
     private final int logPageSize;
     private final MutableLong flushLSN;
     private final AtomicBoolean full;
@@ -62,14 +62,14 @@
     private final DatasetId reusableDsId;
     private final JobId reusableJobId;
 
-    public LogPage(TransactionSubsystem txnSubsystem, int logPageSize, MutableLong flushLSN)
{
+    public LogBuffer(TransactionSubsystem txnSubsystem, int logPageSize, MutableLong flushLSN)
{
         this.txnSubsystem = txnSubsystem;
         this.logPageSize = logPageSize;
         this.flushLSN = flushLSN;
         appendBuffer = ByteBuffer.allocate(logPageSize);
         flushBuffer = appendBuffer.duplicate();
         unlockBuffer = appendBuffer.duplicate();
-        logPageReader = getLogPageReader();
+        logBufferTailReader = getLogBufferTailReader();
         full = new AtomicBoolean(false);
         appendOffset = 0;
         flushOffset = 0;
@@ -206,17 +206,17 @@
         }
     }
 
-    private LogPageReader getLogPageReader() {
-        return new LogPageReader(unlockBuffer);
+    private LogBufferTailReader getLogBufferTailReader() {
+        return new LogBufferTailReader(unlockBuffer);
     }
 
     private void batchUnlock(int beginOffset, int endOffset) throws ACIDException {
         if (endOffset > beginOffset) {
-            logPageReader.initializeScan(beginOffset, endOffset);
+            logBufferTailReader.initializeScan(beginOffset, endOffset);
 
             ITransactionContext txnCtx = null;
 
-            LogRecord logRecord = logPageReader.next();
+            LogRecord logRecord = logBufferTailReader.next();
             while (logRecord != null) {
                 if (logRecord.getLogType() == LogType.ENTITY_COMMIT) {
                     reusableJobId.setId(logRecord.getJobId());
@@ -234,7 +234,7 @@
                     notifyFlushTerminator();
                 }
 
-                logRecord = logPageReader.next();
+                logRecord = logBufferTailReader.next();
             }
         }
     }
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogPageReader.java
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBufferTailReader.java
similarity index 78%
rename from asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogPageReader.java
rename to asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBufferTailReader.java
index 76648ae..f8e0253 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogPageReader.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBufferTailReader.java
@@ -20,15 +20,17 @@
 
 import java.nio.ByteBuffer;
 
+import org.apache.asterix.common.transactions.ILogRecord;
+import org.apache.asterix.common.transactions.ILogRecord.RECORD_STATUS;
 import org.apache.asterix.common.transactions.LogRecord;
 
-public class LogPageReader {
+public class LogBufferTailReader {
 
     private final ByteBuffer buffer;
     private final LogRecord logRecord;
     private int endOffset;
 
-    public LogPageReader(ByteBuffer buffer) {
+    public LogBufferTailReader(ByteBuffer buffer) {
         this.buffer = buffer;
         logRecord = new LogRecord();
     }
@@ -42,7 +44,9 @@
         if (buffer.position() == endOffset) {
             return null;
         }
-        if (!logRecord.readLogRecord(buffer)) {
+        RECORD_STATUS status = logRecord.readLogRecord(buffer);
+        //underflow is not expected because we are at the very tail of the current log buffer
+        if (status != RECORD_STATUS.OK) {
             throw new IllegalStateException();
         }
         return logRecord;
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index b531961..f14c146 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -62,11 +62,11 @@
     private final String logDir;
     private final String logFilePrefix;
     private final MutableLong flushLSN;
-    private LinkedBlockingQueue<LogPage> emptyQ;
-    private LinkedBlockingQueue<LogPage> flushQ;
+    private LinkedBlockingQueue<LogBuffer> emptyQ;
+    private LinkedBlockingQueue<LogBuffer> flushQ;
     private final AtomicLong appendLSN;
     private FileChannel appendChannel;
-    private LogPage appendPage;
+    private LogBuffer appendPage;
     private LogFlusher logFlusher;
     private Future<Object> futureLogFlusher;
     private static final long SMALLEST_LOG_FILE_ID = 0;
@@ -86,10 +86,10 @@
     }
 
     private void initializeLogManager(long nextLogFileId) {
-        emptyQ = new LinkedBlockingQueue<LogPage>(numLogPages);
-        flushQ = new LinkedBlockingQueue<LogPage>(numLogPages);
+        emptyQ = new LinkedBlockingQueue<LogBuffer>(numLogPages);
+        flushQ = new LinkedBlockingQueue<LogBuffer>(numLogPages);
         for (int i = 0; i < numLogPages; i++) {
-            emptyQ.offer(new LogPage(txnSubsystem, logPageSize, flushLSN));
+            emptyQ.offer(new LogBuffer(txnSubsystem, logPageSize, flushLSN));
         }
         appendLSN.set(initializeLogAnchor(nextLogFileId));
         flushLSN.set(appendLSN.get());
@@ -174,7 +174,7 @@
         appendPage.isLastPage(true);
         //[Notice]
         //the current log file channel is closed if 
-        //LogPage.flush() completely flush the last page of the file.
+        //LogBuffer.flush() completely flush the last page of the file.
     }
 
     @Override
@@ -443,15 +443,15 @@
 
 class LogFlusher implements Callable<Boolean> {
     private static final Logger LOGGER = Logger.getLogger(LogFlusher.class.getName());
-    private final static LogPage POISON_PILL = new LogPage(null, ILogRecord.JOB_TERMINATE_LOG_SIZE,
null);
+    private final static LogBuffer POISON_PILL = new LogBuffer(null, ILogRecord.JOB_TERMINATE_LOG_SIZE,
null);
     private final LogManager logMgr;//for debugging
-    private final LinkedBlockingQueue<LogPage> emptyQ;
-    private final LinkedBlockingQueue<LogPage> flushQ;
-    private LogPage flushPage;
+    private final LinkedBlockingQueue<LogBuffer> emptyQ;
+    private final LinkedBlockingQueue<LogBuffer> flushQ;
+    private LogBuffer flushPage;
     private final AtomicBoolean isStarted;
     private final AtomicBoolean terminateFlag;
 
-    public LogFlusher(LogManager logMgr, LinkedBlockingQueue<LogPage> emptyQ, LinkedBlockingQueue<LogPage>
flushQ) {
+    public LogFlusher(LogManager logMgr, LinkedBlockingQueue<LogBuffer> emptyQ, LinkedBlockingQueue<LogBuffer>
flushQ) {
         this.logMgr = logMgr;
         this.emptyQ = emptyQ;
         this.flushQ = flushQ;
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
index 6fa0ebb..9900468 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
@@ -29,6 +29,12 @@
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.MutableLong;
 
+import static org.apache.asterix.common.transactions.LogRecord.*;
+
+/**
+ * NOTE: Many method calls of this class are not thread safe.
+ * Be very cautious using it in a multithreaded context.
+ */
 public class LogReader implements ILogReader {
 
     public static final boolean IS_DEBUG_MODE = false;//true
@@ -67,20 +73,56 @@
             return;
         }
         getFileChannel();
-        readPage();
+        fillLogReadBuffer();
     }
 
-    //for scanning
+    /**
+     * Get the next log record from the log file.
+     * @return A deserialized log record, or null if we have reached the end of the file.
+     * @throws ACIDException
+     */
     @Override
     public ILogRecord next() throws ACIDException {
         if (waitForFlushOrReturnIfEOF() == ReturnState.EOF) {
             return null;
         }
-        if (readBuffer.position() == readBuffer.limit() || !logRecord.readLogRecord(readBuffer))
{
-            readNextPage();
-            if (!logRecord.readLogRecord(readBuffer)) {
-                throw new IllegalStateException();
+        if (readBuffer.position() == readBuffer.limit()) {
+            boolean eof = refillLogReadBuffer();
+            if (eof && isRecoveryMode && readLSN < flushLSN.get()) {
+                LOGGER.severe("Transaction log ends before expected. Log files may be missing.");
+                return null;
             }
+        }
+
+        RECORD_STATUS status = logRecord.readLogRecord(readBuffer);
+        switch(status) {
+            case TRUNCATED: {
+                //we may have just read off the end of the buffer, so try refiling it
+                if(!refillLogReadBuffer()) {
+                    return null;
+                }
+                //now see what we have in the refilled buffer
+                status = logRecord.readLogRecord(readBuffer);
+                switch(status){
+                    case TRUNCATED: {
+                        LOGGER.info("Log file has truncated log records.");
+                        return null;
+                    }
+                    case BAD_CHKSUM:{
+                        LOGGER.severe("Transaction log contains corrupt log records (perhaps
due to medium error). Stopping recovery early.");
+                        return null;
+                    }
+                    case OK: break;
+                }
+                //if we have exited the inner switch,
+                // this means status is really "OK" after buffer refill
+                break;
+            }
+            case BAD_CHKSUM:{
+                LOGGER.severe("Transaction log contains corrupt log records (perhaps due
to medium error). Stopping recovery early.");
+                return null;
+            }
+            case OK: break;
         }
         logRecord.setLSN(readLSN);
         readLSN += logRecord.getLogSize();
@@ -107,32 +149,55 @@
         }
     }
 
-    private void readNextPage() throws ACIDException {
+    /**
+     * Continues log analysis between log file splits.
+     * @return true if log continues, false if EOF
+     * @throws ACIDException
+     */
+    private boolean refillLogReadBuffer() throws ACIDException {
         try {
             if (readLSN % logFileSize == fileChannel.size()) {
                 fileChannel.close();
                 readLSN += logFileSize - (readLSN % logFileSize);
                 getFileChannel();
             }
-            readPage();
+            return fillLogReadBuffer();
         } catch (IOException e) {
             throw new ACIDException(e);
         }
     }
 
-    private void readPage() throws ACIDException {
-        int size;
+    /**
+     * Fills the log buffer with data from the log file at the current position
+     * @return false if EOF, true otherwise
+     * @throws ACIDException
+     */
+
+    private boolean fillLogReadBuffer() throws ACIDException {
+        int size=0;
+        int read=0;
         readBuffer.position(0);
         readBuffer.limit(logPageSize);
         try {
             fileChannel.position(readLSN % logFileSize);
-            size = fileChannel.read(readBuffer);
+            //We loop here because read() may return 0, but this simply means we are waiting
on IO.
+            //Therefore we want to break out only when either the buffer is full, or we reach
EOF.
+            while( size < logPageSize && read != -1) {
+                read = fileChannel.read(readBuffer);
+                if(read>0) {
+                    size += read;
+                }
+            }
         } catch (IOException e) {
             throw new ACIDException(e);
         }
         readBuffer.position(0);
         readBuffer.limit(size);
+        if(size == 0 && read == -1){
+            return false; //EOF
+        }
         bufferBeginLSN = readLSN;
+        return true;
     }
 
     //for random reading
@@ -151,25 +216,37 @@
         try {
             if (fileChannel == null) {
                 getFileChannel();
-                readPage();
+                fillLogReadBuffer();
             } else if (readLSN < fileBeginLSN || readLSN >= fileBeginLSN + fileChannel.size())
{
                 fileChannel.close();
                 getFileChannel();
-                readPage();
+                fillLogReadBuffer();
             } else if (readLSN < bufferBeginLSN || readLSN >= bufferBeginLSN + readBuffer.limit())
{
-                readPage();
+                fillLogReadBuffer();
             } else {
                 readBuffer.position((int) (readLSN - bufferBeginLSN));
             }
         } catch (IOException e) {
             throw new ACIDException(e);
         }
-        if (!logRecord.readLogRecord(readBuffer)) {
-            readNextPage();
-            if (!logRecord.readLogRecord(readBuffer)) {
-                throw new IllegalStateException();
+        boolean hasRemaining;
+        if(readBuffer.position() == readBuffer.limit()){
+            hasRemaining = refillLogReadBuffer();
+            if(!hasRemaining){
+                throw new ACIDException("LSN is out of bounds");
             }
         }
+        RECORD_STATUS status = logRecord.readLogRecord(readBuffer);
+        switch(status){
+            case TRUNCATED:{
+                throw new ACIDException("LSN is out of bounds");
+            }
+            case BAD_CHKSUM:{
+                throw new ACIDException("Log record has incorrect checksum");
+            }
+            case OK: break;
+
+        }
         logRecord.setLSN(readLSN);
         readLSN += logRecord.getLogSize();
         return logRecord;

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/289
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I1658e938eb0f199f748407361ffee4833aac661c
Gerrit-PatchSet: 19
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Ian Maxon <imaxon@apache.org>
Gerrit-Reviewer: Ian Maxon <imaxon@apache.org>
Gerrit-Reviewer: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Murtadha Hubail <hubailmor@gmail.com>
Gerrit-Reviewer: Young-Seok Kim <kisskys@gmail.com>

Mime
View raw message