asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mhub...@apache.org
Subject incubator-asterixdb git commit: ASTERIXDB-1160: Break deadlock between LogManager and LogFlusher
Date Mon, 09 Nov 2015 06:10:21 GMT
Repository: incubator-asterixdb
Updated Branches:
  refs/heads/master 39e9c728b -> eacff7818


ASTERIXDB-1160: Break deadlock between LogManager and LogFlusher

This change introduces a new thread in LogManager to log FLUSH logs.
This will prevent LogFlusher thread from waiting on itself until FLUSH logs are flushed to
disk.

Change-Id: I0b414f5ab92e1c68c8aafbaab859c644ba399dcb
Reviewed-on: https://asterix-gerrit.ics.uci.edu/475
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/commit/eacff781
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/tree/eacff781
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/diff/eacff781

Branch: refs/heads/master
Commit: eacff78183c3f647205e5768d51488ef6a6dae8a
Parents: 39e9c72
Author: Murtadha Hubail <mhubail@uci.edu>
Authored: Thu Nov 5 19:38:01 2015 -0800
Committer: Murtadha Hubail <hubailmor@gmail.com>
Committed: Sun Nov 8 22:06:54 2015 -0800

----------------------------------------------------------------------
 asterix-lang-common/pom.xml                     |  4 +-
 .../management/service/logging/LogManager.java  | 55 ++++++++++++++++----
 2 files changed, 48 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/eacff781/asterix-lang-common/pom.xml
----------------------------------------------------------------------
diff --git a/asterix-lang-common/pom.xml b/asterix-lang-common/pom.xml
index 974212b..1e22da0 100644
--- a/asterix-lang-common/pom.xml
+++ b/asterix-lang-common/pom.xml
@@ -43,8 +43,8 @@
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-compiler-plugin</artifactId>
                 <configuration>
-                    <source>1.7</source>
-                    <target>1.7</target>
+                    <source>1.8</source>
+                    <target>1.8</target>
                     <fork>true</fork>
                 </configuration>
             </plugin>

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/eacff781/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index f14c146..87d0ad8 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -70,8 +70,10 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
     private LogFlusher logFlusher;
     private Future<Object> futureLogFlusher;
     private static final long SMALLEST_LOG_FILE_ID = 0;
+    private LinkedBlockingQueue<ILogRecord> flushLogsQ;
+    private final FlushLogsLogger flushLogsLogger;
 
-    public LogManager(TransactionSubsystem txnSubsystem) throws ACIDException {
+    public LogManager(TransactionSubsystem txnSubsystem) {
         this.txnSubsystem = txnSubsystem;
         logManagerProperties = new LogManagerProperties(this.txnSubsystem.getTransactionProperties(),
                 this.txnSubsystem.getId());
@@ -82,6 +84,8 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
         logFilePrefix = logManagerProperties.getLogFilePrefix();
         flushLSN = new MutableLong();
         appendLSN = new AtomicLong();
+        flushLogsQ = new LinkedBlockingQueue<>();
+        flushLogsLogger = new FlushLogsLogger();
         initializeLogManager(SMALLEST_LOG_FILE_ID);
     }
 
@@ -100,6 +104,9 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
         getAndInitNewPage();
         logFlusher = new LogFlusher(this, emptyQ, flushQ);
         futureLogFlusher = txnSubsystem.getAsterixAppRuntimeContextProvider().getThreadExecutor().submit(logFlusher);
+        if (!flushLogsLogger.isAlive()) {
+            txnSubsystem.getAsterixAppRuntimeContextProvider().getThreadExecutor().execute(flushLogsLogger);
+        }
     }
 
     @Override
@@ -108,7 +115,15 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
             throw new IllegalStateException();
         }
 
-        syncLog(logRecord);
+        if (logRecord.getLogType() == LogType.FLUSH) {
+            flushLogsQ.offer(logRecord);
+            return;
+        }
+        appendToLogTail(logRecord);
+    }
+
+    private void appendToLogTail(ILogRecord logRecord) throws ACIDException {
+        syncAppendToLogTail(logRecord);
 
         if ((logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT)
                 && !logRecord.isFlushed()) {
@@ -124,16 +139,15 @@ public class LogManager implements ILogManager, ILifeCycleComponent
{
         }
     }
 
-    private synchronized void syncLog(ILogRecord logRecord) throws ACIDException {
+    private synchronized void syncAppendToLogTail(ILogRecord logRecord) throws ACIDException
{
         ITransactionContext txnCtx = null;
 
         if (logRecord.getLogType() != LogType.FLUSH) {
             txnCtx = logRecord.getTxnCtx();
             if (txnCtx.getTxnState() == ITransactionManager.ABORTED && logRecord.getLogType()
!= LogType.ABORT) {
-                throw new ACIDException("Aborted job(" + txnCtx.getJobId()
-                        + ") tried to write non-abort type log record.");
+                throw new ACIDException(
+                        "Aborted job(" + txnCtx.getJobId() + ") tried to write non-abort
type log record.");
             }
-
         }
         if (getLogFileOffset(appendLSN.get()) + logRecord.getLogSize() > logFileSize)
{
             prepareNextLogFile();
@@ -394,7 +408,7 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
         return lsn / logFileSize;
     }
 
-    private boolean createFileIfNotExists(String path) throws IOException {
+    private static boolean createFileIfNotExists(String path) throws IOException {
         File file = new File(path);
         File parentFile = file.getParentFile();
         if (parentFile != null) {
@@ -403,7 +417,7 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
         return file.createNewFile();
     }
 
-    private boolean createNewDirectory(String path) throws IOException {
+    private static boolean createNewDirectory(String path) {
         return (new File(path)).mkdir();
     }
 
@@ -435,10 +449,33 @@ public class LogManager implements ILogManager, ILifeCycleComponent
{
         List<Long> logFileIds = getLogFileIds();
         if (logFileIds != null) {
             return logFileIds.get(0) * logFileSize;
-        }else{
+        } else {
             throw new IllegalStateException("Couldn't find any log files.");
         }
     }
+
+    /**
+     * This class is used to log FLUSH logs.
+     * FLUSH logs are flushed on a different thread to avoid a possible deadlock in LogBuffer
batchUnlock which calls PrimaryIndexOpeartionTracker.completeOperation
+     * The deadlock happens when PrimaryIndexOpeartionTracker.completeOperation results in
generating a FLUSH log and there are no empty log buffers available to log it.
+     */
+    private class FlushLogsLogger extends Thread {
+        private ILogRecord logRecord;
+
+        @Override
+        public void run() {
+            while (true) {
+                try {
+                    logRecord = flushLogsQ.take();
+                    appendToLogTail(logRecord);
+                } catch (ACIDException e) {
+                    e.printStackTrace();
+                } catch (InterruptedException e) {
+                    //ignore
+                }
+            }
+        }
+    }
 }
 
 class LogFlusher implements Callable<Boolean> {


Mime
View raw message