asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Murtadha Hubail (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: [ASTERIXDB-2230][TX] Survive Interrupt in Log File Switch
Date Fri, 19 Jan 2018 12:22:26 GMT
Murtadha Hubail has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2301

Change subject: [ASTERIXDB-2230][TX] Survive Interrupt in Log File Switch
......................................................................

[ASTERIXDB-2230][TX] Survive Interrupt in Log File Switch

- user model changes: no
- storage format changes: no
- interface changes: yes
  - ILogBuffer: (-) setLastPage
  - ILogManager: (-) renewLogFilesAndStartFromLSN

Details:
- Survive interrupt in log file switch.
- Make LogManager responsible of closing log files.
- Remove unneeded methods in ILogManager and ILogBuffer.
- Adapt log file switch test case to new behavior.

Change-Id: I191564c510c0555f191a35e2603e051bbef24540
---
M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InvokeUtil.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
6 files changed, 71 insertions(+), 86 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/01/2301/1

diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
index b14d70b..ec8334e 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
@@ -19,8 +19,6 @@
 package org.apache.asterix.test.txn;
 
 import java.lang.reflect.Method;
-import java.util.Arrays;
-import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -46,7 +44,6 @@
 import org.apache.asterix.test.common.TestTupleReference;
 import org.apache.asterix.transaction.management.service.logging.LogManager;
 import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants;
-import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.junit.After;
@@ -146,39 +143,20 @@
         int logFileCountBeforeInterrupt = logManager.getLogFileIds().size();
 
         // ensure an interrupted transactor will create next log file but will fail to position
the log channel
-        final AtomicBoolean interrupted = new AtomicBoolean(false);
+        final AtomicBoolean failed = new AtomicBoolean(false);
         Thread interruptedTransactor = new Thread(() -> {
             Thread.currentThread().interrupt();
-            try {
-                prepareNextLogFile(logManager);
-            } catch (Exception e) {
-                Throwable rootCause = ExceptionUtils.getRootCause(e);
-                if (rootCause.getCause() instanceof java.nio.channels.ClosedByInterruptException)
{
-                    interrupted.set(true);
-                }
-            }
-        });
-        interruptedTransactor.start();
-        interruptedTransactor.join();
-        // ensure a new log file was created but the thread was interrupt
-        int logFileCountAfterInterrupt = logManager.getLogFileIds().size();
-        Assert.assertEquals(logFileCountBeforeInterrupt + 1, logFileCountAfterInterrupt);
-        Assert.assertTrue(interrupted.get());
-
-        // ensure next transactor will not create another file
-        final AtomicBoolean failed = new AtomicBoolean(false);
-        Thread transactor = new Thread(() -> {
             try {
                 prepareNextLogFile(logManager);
             } catch (Exception e) {
                 failed.set(true);
             }
         });
-        transactor.start();
-        transactor.join();
-        // make sure no new files were created and the operation was successful
-        int countAfterTransactor = logManager.getLogFileIds().size();
-        Assert.assertEquals(logFileCountAfterInterrupt, countAfterTransactor);
+        interruptedTransactor.start();
+        interruptedTransactor.join();
+        // ensure a new log file was created and survived interrupt
+        int logFileCountAfterInterrupt = logManager.getLogFileIds().size();
+        Assert.assertEquals(logFileCountBeforeInterrupt + 1, logFileCountAfterInterrupt);
         Assert.assertFalse(failed.get());
 
         // make sure we can still log to the new file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java
index 6bdce73..b4a7d38 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java
@@ -62,11 +62,6 @@
     void reset();
 
     /**
-     * Set current page to be the last page of the associated file
-     */
-    void setLastPage();
-
-    /**
      * stops the log buffer
      */
     void stop();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
index aa018ba..d7e0885 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
@@ -63,14 +63,6 @@
     public String getNodeId();
 
     /**
-     * Delete all log files and start new log partition > LSNtoStartFrom
-     *
-     * @param LSNtoStartFrom
-     * @throws IOException
-     */
-    public void renewLogFilesAndStartFromLSN(long LSNtoStartFrom) throws IOException;
-
-    /**
      * @return the log page size in bytes
      */
     public int getLogPageSize();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InvokeUtil.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InvokeUtil.java
index c718fca..7d4edbf 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InvokeUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InvokeUtil.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.common.utils;
 
 import java.io.IOException;
+import java.nio.channels.ClosedByInterruptException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
@@ -29,6 +30,9 @@
 public class InvokeUtil {
 
     private static final Logger LOGGER = LogManager.getLogger();
+
+    private InvokeUtil() {
+    }
 
     /**
      * Executes the passed interruptible, retrying if the operation is interrupted. Once
the interruptible
@@ -144,6 +148,31 @@
         return false;
     }
 
+    /**
+     * Executes the passed interruptible, retrying if the operation fails due to {@link ClosedByInterruptException}.
+     * Once the interruptible completes, the current thread will be re-interrupted, if the
original operation was
+     * interrupted.
+     */
+    public static void doIoUninterruptibly(ThrowingIOInterruptible interruptible) throws
IOException {
+        boolean interrupted = false;
+        try {
+            while (true) {
+                try {
+                    interruptible.run();
+                    break;
+                } catch (ClosedByInterruptException e) {
+                    LOGGER.error("IO operation Interrupted. Retrying..", e);
+                    interrupted = true;
+                    Thread.interrupted();
+                }
+            }
+        } finally {
+            if (interrupted) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
     @FunctionalInterface
     public interface Interruptible {
         void run() throws InterruptedException;
@@ -154,4 +183,8 @@
         void run() throws Exception; // NOSONAR
     }
 
+    @FunctionalInterface
+    public interface ThrowingIOInterruptible {
+        void run() throws IOException;
+    }
 }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index 011d2a1..614591b 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.transaction.management.service.logging;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -56,7 +55,6 @@
     protected final ByteBuffer appendBuffer;
     private final ByteBuffer flushBuffer;
     private final ByteBuffer unlockBuffer;
-    private boolean isLastPage;
     protected final LinkedBlockingQueue<ILogRecord> syncCommitQ;
     protected final LinkedBlockingQueue<ILogRecord> flushQ;
     protected final LinkedBlockingQueue<ILogRecord> remoteJobsQ;
@@ -76,7 +74,6 @@
         full = new AtomicBoolean(false);
         appendOffset = 0;
         flushOffset = 0;
-        isLastPage = false;
         syncCommitQ = new LinkedBlockingQueue<>(logPageSize / ILogRecord.JOB_TERMINATE_LOG_SIZE);
         flushQ = new LinkedBlockingQueue<>();
         remoteJobsQ = new LinkedBlockingQueue<>();
@@ -132,11 +129,6 @@
     }
 
     @Override
-    public void setLastPage() {
-        this.isLastPage = true;
-    }
-
-    @Override
     public boolean hasSpace(int logSize) {
         return appendOffset + logSize <= logPageSize && !full.get();
     }
@@ -152,7 +144,6 @@
         full.set(false);
         appendOffset = 0;
         flushOffset = 0;
-        isLastPage = false;
         stop = false;
     }
 
@@ -174,24 +165,18 @@
                                         + ", full: " + full.get());
                             }
                             if (stopping || stop) {
-                                fileChannel.close();
                                 return;
                             }
                             wait();
                         }
                         endOffset = appendOffset;
                     }
-                internalFlush(flushOffset, endOffset);
+                    internalFlush(flushOffset, endOffset);
                 } catch (InterruptedException e) {
                     interrupted = true;
                 }
             }
             internalFlush(flushOffset, appendOffset);
-            if (isLastPage) {
-                fileChannel.close();
-            }
-        } catch (IOException e) {
-            throw new IllegalStateException(e);
         } finally {
             if (interrupted) {
                 Thread.currentThread().interrupt();
@@ -235,8 +220,8 @@
                         reusableTxnId.setId(logRecord.getTxnId());
                         reusableDatasetId.setId(logRecord.getDatasetId());
                         txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(reusableTxnId);
-                        txnSubsystem.getLockManager().unlock(reusableDatasetId, logRecord.getPKHashValue(),
-                                LockMode.ANY, txnCtx);
+                        txnSubsystem.getLockManager()
+                                .unlock(reusableDatasetId, logRecord.getPKHashValue(), LockMode.ANY,
txnCtx);
                         txnCtx.notifyEntityCommitted();
                         if (txnSubsystem.getTransactionProperties().isCommitProfilerEnabled())
{
                             txnSubsystem.incrementEntityCommitCount();
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index 96d0539..4727b09 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -93,6 +93,7 @@
     private LogFlusher logFlusher;
     private Future<? extends Object> futureLogFlusher;
     protected LinkedBlockingQueue<ILogRecord> flushLogsQ;
+    private long currentLogFileId;
 
     public LogManager(ITransactionSubsystem txnSubsystem) {
         this.txnSubsystem = txnSubsystem;
@@ -239,17 +240,20 @@
     private void prepareNextLogFile() {
         final long nextFileBeginLsn = getNextFileFirstLsn();
         try {
-            createNextLogFile();
-            setLogPosition(nextFileBeginLsn);
+            InvokeUtil.doIoUninterruptibly(() -> {
+                closeCurrentLogFile();
+                createNextLogFile();
+                setLogPosition(nextFileBeginLsn);
+                // move appendLSN and flushLSN to the first LSN of the next log file
+                // only after the file was created and the channel was positioned successfully
+                appendLSN.set(nextFileBeginLsn);
+                flushLSN.set(nextFileBeginLsn);
+                LOGGER.info("Created new txn log file with id({}) starting with LSN = {}",
+                        getLogFileId(nextFileBeginLsn), nextFileBeginLsn);
+            });
         } catch (IOException e) {
             throw new ACIDException(e);
         }
-        // move appendLSN and flushLSN to the first LSN of the next log file
-        // only after the file was created and the channel was positioned successfully
-        appendLSN.set(nextFileBeginLsn);
-        flushLSN.set(nextFileBeginLsn);
-        LOGGER.info("Created new txn log file with id({}) starting with LSN = {}", getLogFileId(nextFileBeginLsn),
-                nextFileBeginLsn);
     }
 
     private long getNextFileFirstLsn() {
@@ -258,8 +262,6 @@
     }
 
     private void ensureLastPageFlushed() {
-        // Mark the page as the last page so that it will close the output file channel.
-        appendPage.setLastPage();
         // Make sure to flush whatever left in the log tail.
         appendPage.setFull();
         synchronized (flushLSN) {
@@ -301,6 +303,7 @@
     @Override
     public void stop(boolean dumpState, OutputStream os) {
         terminateLogFlusher();
+        closeCurrentLogFile();
         if (dumpState) {
             dumpState(os);
         }
@@ -387,6 +390,7 @@
     @Override
     public void renewLogFiles() {
         terminateLogFlusher();
+        closeCurrentLogFile();
         long lastMaxLogFileId = deleteAllLogFiles();
         initializeLogManager(lastMaxLogFileId + 1);
     }
@@ -445,13 +449,6 @@
     }
 
     private long deleteAllLogFiles() {
-        if (appendChannel != null) {
-            try {
-                appendChannel.close();
-            } catch (IOException e) {
-                throw new IllegalStateException("Failed to close a fileChannel of a log file");
-            }
-        }
         txnLogFileId2ReaderCount.clear();
         List<Long> logFileIds = getLogFileIds();
         if (logFileIds != null) {
@@ -537,9 +534,22 @@
         final long fileId = getLogFileId(lsn);
         final Path targetFilePath = Paths.get(getLogFilePath(fileId));
         final long targetPosition = getLogFileOffset(lsn);
-        final RandomAccessFile raf = new RandomAccessFile(targetFilePath.toFile(), "rw");
// NOSONAR closed by LogBuffer
+        final RandomAccessFile raf = new RandomAccessFile(targetFilePath.toFile(), "rw");
// NOSONAR closed when full
         appendChannel = raf.getChannel();
         appendChannel.position(targetPosition);
+        currentLogFileId = fileId;
+    }
+
+    private void closeCurrentLogFile() {
+        if (appendChannel != null && appendChannel.isOpen()) {
+            try {
+                LOGGER.info("closing current log file with id({})", currentLogFileId);
+                appendChannel.close();
+            } catch (IOException e) {
+                LOGGER.error(() -> "failed to close log file with id(" + currentLogFileId
+ ")", e);
+                throw new ACIDException(e);
+            }
+        }
     }
 
     @Override
@@ -560,14 +570,6 @@
     @Override
     public int getLogPageSize() {
         return logPageSize;
-    }
-
-    @Override
-    public void renewLogFilesAndStartFromLSN(long LSNtoStartFrom) throws IOException {
-        terminateLogFlusher();
-        deleteAllLogFiles();
-        long newLogFile = getLogFileId(LSNtoStartFrom);
-        initializeLogManager(newLogFile + 1);
     }
 
     @Override
@@ -687,7 +689,7 @@
     }
 
     @Override
-    public Boolean call() throws InterruptedException {
+    public Boolean call() {
         started.release();
         boolean interrupted = false;
         try {

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2301
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I191564c510c0555f191a35e2603e051bbef24540
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <mhubail@apache.org>

Mime
View raw message