asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mhub...@apache.org
Subject asterixdb git commit: [ASTERIXDB-2230][TX] Survive Interrupt in Log File Switch
Date Fri, 19 Jan 2018 18:12:35 GMT
Repository: asterixdb
Updated Branches:
  refs/heads/master d8d4eefbe -> f2604d89c


[ASTERIXDB-2230][TX] Survive Interrupt in Log File Switch

- user model changes: no
- storage format changes: no
- interface changes: yes
  - ILogBuffer: (-) setLastPage
  - ILogManager: (-) renewLogFilesAndStartFromLSN

Details:
- Survive interrupt in log file switch.
- Make LogManager responsible of closing log files.
- Remove unneeded methods in ILogManager and ILogBuffer.
- Adapt log file switch test case to new behavior.

Change-Id: I191564c510c0555f191a35e2603e051bbef24540
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2301
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/f2604d89
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/f2604d89
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/f2604d89

Branch: refs/heads/master
Commit: f2604d89c42f8b6659288df195f9c082f417e912
Parents: d8d4eef
Author: Murtadha Hubail <mhubail@apache.org>
Authored: Fri Jan 19 18:47:09 2018 +0300
Committer: Murtadha Hubail <mhubail@apache.org>
Committed: Fri Jan 19 10:12:00 2018 -0800

----------------------------------------------------------------------
 .../apache/asterix/test/txn/LogManagerTest.java | 47 ++++++------------
 .../asterix/common/transactions/ILogBuffer.java |  5 --
 .../common/transactions/ILogManager.java        |  8 ---
 .../apache/asterix/common/utils/InvokeUtil.java | 33 +++++++++++++
 .../management/service/logging/LogBuffer.java   | 21 ++------
 .../management/service/logging/LogManager.java  | 52 ++++++++++----------
 6 files changed, 78 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2604d89/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
index b14d70b..a1978eb 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
@@ -19,8 +19,6 @@
 package org.apache.asterix.test.txn;
 
 import java.lang.reflect.Method;
-import java.util.Arrays;
-import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -46,7 +44,6 @@ import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.test.common.TestTupleReference;
 import org.apache.asterix.transaction.management.service.logging.LogManager;
 import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants;
-import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.junit.After;
@@ -59,6 +56,7 @@ public class LogManagerTest {
     protected static final String TEST_CONFIG_FILE_NAME = "src/main/resources/cc.conf";
     private static final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
     private static final String PREPARE_NEXT_LOG_FILE_METHOD = "prepareNextLogFile";
+    private static final String ENSURE_LAST_PAGE_FLUSHED_METHOD = "ensureLastPageFlushed";
 
     @Before
     public void setUp() throws Exception {
@@ -146,39 +144,20 @@ public class LogManagerTest {
         int logFileCountBeforeInterrupt = logManager.getLogFileIds().size();
 
         // ensure an interrupted transactor will create next log file but will fail to position
the log channel
-        final AtomicBoolean interrupted = new AtomicBoolean(false);
+        final AtomicBoolean failed = new AtomicBoolean(false);
         Thread interruptedTransactor = new Thread(() -> {
             Thread.currentThread().interrupt();
             try {
                 prepareNextLogFile(logManager);
             } catch (Exception e) {
-                Throwable rootCause = ExceptionUtils.getRootCause(e);
-                if (rootCause.getCause() instanceof java.nio.channels.ClosedByInterruptException)
{
-                    interrupted.set(true);
-                }
+                failed.set(true);
             }
         });
         interruptedTransactor.start();
         interruptedTransactor.join();
-        // ensure a new log file was created but the thread was interrupt
+        // ensure a new log file was created and survived interrupt
         int logFileCountAfterInterrupt = logManager.getLogFileIds().size();
         Assert.assertEquals(logFileCountBeforeInterrupt + 1, logFileCountAfterInterrupt);
-        Assert.assertTrue(interrupted.get());
-
-        // ensure next transactor will not create another file
-        final AtomicBoolean failed = new AtomicBoolean(false);
-        Thread transactor = new Thread(() -> {
-            try {
-                prepareNextLogFile(logManager);
-            } catch (Exception e) {
-                failed.set(true);
-            }
-        });
-        transactor.start();
-        transactor.join();
-        // make sure no new files were created and the operation was successful
-        int countAfterTransactor = logManager.getLogFileIds().size();
-        Assert.assertEquals(logFileCountAfterInterrupt, countAfterTransactor);
         Assert.assertFalse(failed.get());
 
         // make sure we can still log to the new file
@@ -196,14 +175,20 @@ public class LogManagerTest {
     }
 
     private static void prepareNextLogFile(LogManager logManager) throws Exception {
-        Method method;
+        Method ensureLastPageFlushed;
+        Method prepareNextLogFile;
+        String targetMethod = null;
         try {
-            method = LogManager.class.getDeclaredMethod(PREPARE_NEXT_LOG_FILE_METHOD, null);
+            targetMethod = ENSURE_LAST_PAGE_FLUSHED_METHOD;
+            ensureLastPageFlushed = LogManager.class.getDeclaredMethod(targetMethod, null);
+            targetMethod = PREPARE_NEXT_LOG_FILE_METHOD;
+            prepareNextLogFile = LogManager.class.getDeclaredMethod(targetMethod, null);
         } catch (Exception e) {
-            throw new IllegalStateException(
-                    "Couldn't find " + PREPARE_NEXT_LOG_FILE_METHOD + " in LogManager. Was
it renamed?");
+            throw new IllegalStateException("Couldn't find " + targetMethod + " in LogManager.
Was it renamed?");
         }
-        method.setAccessible(true);
-        method.invoke(logManager, null);
+        ensureLastPageFlushed.setAccessible(true);
+        ensureLastPageFlushed.invoke(logManager, null);
+        prepareNextLogFile.setAccessible(true);
+        prepareNextLogFile.invoke(logManager, null);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2604d89/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java
index 6bdce73..b4a7d38 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java
@@ -62,11 +62,6 @@ public interface ILogBuffer {
     void reset();
 
     /**
-     * Set current page to be the last page of the associated file
-     */
-    void setLastPage();
-
-    /**
      * stops the log buffer
      */
     void stop();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2604d89/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
index aa018ba..d7e0885 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
@@ -63,14 +63,6 @@ public interface ILogManager {
     public String getNodeId();
 
     /**
-     * Delete all log files and start new log partition > LSNtoStartFrom
-     *
-     * @param LSNtoStartFrom
-     * @throws IOException
-     */
-    public void renewLogFilesAndStartFromLSN(long LSNtoStartFrom) throws IOException;
-
-    /**
      * @return the log page size in bytes
      */
     public int getLogPageSize();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2604d89/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InvokeUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InvokeUtil.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InvokeUtil.java
index c718fca..9bdf55c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InvokeUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InvokeUtil.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.common.utils;
 
 import java.io.IOException;
+import java.nio.channels.ClosedByInterruptException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
@@ -30,6 +31,9 @@ public class InvokeUtil {
 
     private static final Logger LOGGER = LogManager.getLogger();
 
+    private InvokeUtil() {
+    }
+
     /**
      * Executes the passed interruptible, retrying if the operation is interrupted. Once
the interruptible
      * completes, the current thread will be re-interrupted, if the original operation was
interrupted.
@@ -144,6 +148,31 @@ public class InvokeUtil {
         return false;
     }
 
+    /**
+     * Executes the passed interruptible, retrying if the operation fails due to {@link ClosedByInterruptException}
or
+     * {@link InterruptedException}. Once the interruptible completes, the current thread
will be re-interrupted, if
+     * the original operation was interrupted.
+     */
+    public static void doIoUninterruptibly(ThrowingIOInterruptible interruptible) throws
IOException {
+        boolean interrupted = false;
+        try {
+            while (true) {
+                try {
+                    interruptible.run();
+                    break;
+                } catch (ClosedByInterruptException | InterruptedException e) {
+                    LOGGER.error("IO operation Interrupted. Retrying..", e);
+                    interrupted = true;
+                    Thread.interrupted();
+                }
+            }
+        } finally {
+            if (interrupted) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
     @FunctionalInterface
     public interface Interruptible {
         void run() throws InterruptedException;
@@ -154,4 +183,8 @@ public class InvokeUtil {
         void run() throws Exception; // NOSONAR
     }
 
+    @FunctionalInterface
+    public interface ThrowingIOInterruptible {
+        void run() throws IOException, InterruptedException;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2604d89/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index 011d2a1..614591b 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.transaction.management.service.logging;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -56,7 +55,6 @@ public class LogBuffer implements ILogBuffer {
     protected final ByteBuffer appendBuffer;
     private final ByteBuffer flushBuffer;
     private final ByteBuffer unlockBuffer;
-    private boolean isLastPage;
     protected final LinkedBlockingQueue<ILogRecord> syncCommitQ;
     protected final LinkedBlockingQueue<ILogRecord> flushQ;
     protected final LinkedBlockingQueue<ILogRecord> remoteJobsQ;
@@ -76,7 +74,6 @@ public class LogBuffer implements ILogBuffer {
         full = new AtomicBoolean(false);
         appendOffset = 0;
         flushOffset = 0;
-        isLastPage = false;
         syncCommitQ = new LinkedBlockingQueue<>(logPageSize / ILogRecord.JOB_TERMINATE_LOG_SIZE);
         flushQ = new LinkedBlockingQueue<>();
         remoteJobsQ = new LinkedBlockingQueue<>();
@@ -132,11 +129,6 @@ public class LogBuffer implements ILogBuffer {
     }
 
     @Override
-    public void setLastPage() {
-        this.isLastPage = true;
-    }
-
-    @Override
     public boolean hasSpace(int logSize) {
         return appendOffset + logSize <= logPageSize && !full.get();
     }
@@ -152,7 +144,6 @@ public class LogBuffer implements ILogBuffer {
         full.set(false);
         appendOffset = 0;
         flushOffset = 0;
-        isLastPage = false;
         stop = false;
     }
 
@@ -174,24 +165,18 @@ public class LogBuffer implements ILogBuffer {
                                         + ", full: " + full.get());
                             }
                             if (stopping || stop) {
-                                fileChannel.close();
                                 return;
                             }
                             wait();
                         }
                         endOffset = appendOffset;
                     }
-                internalFlush(flushOffset, endOffset);
+                    internalFlush(flushOffset, endOffset);
                 } catch (InterruptedException e) {
                     interrupted = true;
                 }
             }
             internalFlush(flushOffset, appendOffset);
-            if (isLastPage) {
-                fileChannel.close();
-            }
-        } catch (IOException e) {
-            throw new IllegalStateException(e);
         } finally {
             if (interrupted) {
                 Thread.currentThread().interrupt();
@@ -235,8 +220,8 @@ public class LogBuffer implements ILogBuffer {
                         reusableTxnId.setId(logRecord.getTxnId());
                         reusableDatasetId.setId(logRecord.getDatasetId());
                         txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(reusableTxnId);
-                        txnSubsystem.getLockManager().unlock(reusableDatasetId, logRecord.getPKHashValue(),
-                                LockMode.ANY, txnCtx);
+                        txnSubsystem.getLockManager()
+                                .unlock(reusableDatasetId, logRecord.getPKHashValue(), LockMode.ANY,
txnCtx);
                         txnCtx.notifyEntityCommitted();
                         if (txnSubsystem.getTransactionProperties().isCommitProfilerEnabled())
{
                             txnSubsystem.incrementEntityCommitCount();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2604d89/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index 96d0539..c226886 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -93,6 +93,7 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
     private LogFlusher logFlusher;
     private Future<? extends Object> futureLogFlusher;
     protected LinkedBlockingQueue<ILogRecord> flushLogsQ;
+    private long currentLogFileId;
 
     public LogManager(ITransactionSubsystem txnSubsystem) {
         this.txnSubsystem = txnSubsystem;
@@ -239,17 +240,18 @@ public class LogManager implements ILogManager, ILifeCycleComponent
{
     private void prepareNextLogFile() {
         final long nextFileBeginLsn = getNextFileFirstLsn();
         try {
+            closeCurrentLogFile();
             createNextLogFile();
-            setLogPosition(nextFileBeginLsn);
+            InvokeUtil.doIoUninterruptibly(() -> setLogPosition(nextFileBeginLsn));
+            // move appendLSN and flushLSN to the first LSN of the next log file
+            // only after the file was created and the channel was positioned successfully
+            appendLSN.set(nextFileBeginLsn);
+            flushLSN.set(nextFileBeginLsn);
+            LOGGER.info("Created new txn log file with id({}) starting with LSN = {}", currentLogFileId,
+                    nextFileBeginLsn);
         } catch (IOException e) {
             throw new ACIDException(e);
         }
-        // move appendLSN and flushLSN to the first LSN of the next log file
-        // only after the file was created and the channel was positioned successfully
-        appendLSN.set(nextFileBeginLsn);
-        flushLSN.set(nextFileBeginLsn);
-        LOGGER.info("Created new txn log file with id({}) starting with LSN = {}", getLogFileId(nextFileBeginLsn),
-                nextFileBeginLsn);
     }
 
     private long getNextFileFirstLsn() {
@@ -258,8 +260,6 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
     }
 
     private void ensureLastPageFlushed() {
-        // Mark the page as the last page so that it will close the output file channel.
-        appendPage.setLastPage();
         // Make sure to flush whatever left in the log tail.
         appendPage.setFull();
         synchronized (flushLSN) {
@@ -301,6 +301,7 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
     @Override
     public void stop(boolean dumpState, OutputStream os) {
         terminateLogFlusher();
+        closeCurrentLogFile();
         if (dumpState) {
             dumpState(os);
         }
@@ -387,6 +388,7 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
     @Override
     public void renewLogFiles() {
         terminateLogFlusher();
+        closeCurrentLogFile();
         long lastMaxLogFileId = deleteAllLogFiles();
         initializeLogManager(lastMaxLogFileId + 1);
     }
@@ -445,13 +447,6 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
     }
 
     private long deleteAllLogFiles() {
-        if (appendChannel != null) {
-            try {
-                appendChannel.close();
-            } catch (IOException e) {
-                throw new IllegalStateException("Failed to close a fileChannel of a log file");
-            }
-        }
         txnLogFileId2ReaderCount.clear();
         List<Long> logFileIds = getLogFileIds();
         if (logFileIds != null) {
@@ -537,9 +532,22 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
         final long fileId = getLogFileId(lsn);
         final Path targetFilePath = Paths.get(getLogFilePath(fileId));
         final long targetPosition = getLogFileOffset(lsn);
-        final RandomAccessFile raf = new RandomAccessFile(targetFilePath.toFile(), "rw");
// NOSONAR closed by LogBuffer
+        final RandomAccessFile raf = new RandomAccessFile(targetFilePath.toFile(), "rw");
// NOSONAR closed when full
         appendChannel = raf.getChannel();
         appendChannel.position(targetPosition);
+        currentLogFileId = fileId;
+    }
+
+    private void closeCurrentLogFile() {
+        if (appendChannel != null && appendChannel.isOpen()) {
+            try {
+                LOGGER.info("closing current log file with id({})", currentLogFileId);
+                appendChannel.close();
+            } catch (IOException e) {
+                LOGGER.error(() -> "failed to close log file with id(" + currentLogFileId
+ ")", e);
+                throw new ACIDException(e);
+            }
+        }
     }
 
     @Override
@@ -563,14 +571,6 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
     }
 
     @Override
-    public void renewLogFilesAndStartFromLSN(long LSNtoStartFrom) throws IOException {
-        terminateLogFlusher();
-        deleteAllLogFiles();
-        long newLogFile = getLogFileId(LSNtoStartFrom);
-        initializeLogManager(newLogFile + 1);
-    }
-
-    @Override
     public void setReplicationManager(IReplicationManager replicationManager) {
         throw new IllegalStateException("This log manager does not support replication");
     }
@@ -687,7 +687,7 @@ class LogFlusher implements Callable<Boolean> {
     }
 
     @Override
-    public Boolean call() throws InterruptedException {
+    public Boolean call() {
         started.release();
         boolean interrupted = false;
         try {


Mime
View raw message