asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Michael Blow (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: Ensure LogManager Doesn't Exceed the Size of the Log Page Qu...
Date Fri, 04 Nov 2016 21:46:42 GMT
Michael Blow has submitted this change and it was merged.

Change subject: Ensure LogManager Doesn't Exceed the Size of the Log Page Queues
......................................................................


Ensure LogManager Doesn't Exceed the Size of the Log Page Queues

Change-Id: If6427576a31090a057ee6a3d25e35eef5cdd86f8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1342
Reviewed-by: Michael Blow <mblow@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
---
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
1 file changed, 22 insertions(+), 13 deletions(-)

Approvals:
  Michael Blow: Looks good to me, approved
  Jenkins: Verified; Verified

Objections:
  Jenkins: Violations found



diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index 814bcfc..57d5c39 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -67,6 +67,7 @@
     private final MutableLong flushLSN;
     private LinkedBlockingQueue<LogBuffer> emptyQ;
     private LinkedBlockingQueue<LogBuffer> flushQ;
+    private LinkedBlockingQueue<LogBuffer> stashQ;
     protected final AtomicLong appendLSN;
     private FileChannel appendChannel;
     protected LogBuffer appendPage;
@@ -97,8 +98,9 @@
     }
 
     private void initializeLogManager(long nextLogFileId) {
-        emptyQ = new LinkedBlockingQueue<LogBuffer>(numLogPages);
-        flushQ = new LinkedBlockingQueue<LogBuffer>(numLogPages);
+        emptyQ = new LinkedBlockingQueue<>(numLogPages);
+        flushQ = new LinkedBlockingQueue<>(numLogPages);
+        stashQ = new LinkedBlockingQueue<>(numLogPages);
         for (int i = 0; i < numLogPages; i++) {
             emptyQ.offer(new LogBuffer(txnSubsystem, logPageSize, flushLSN));
         }
@@ -109,7 +111,7 @@
         }
         appendChannel = getFileChannel(appendLSN.get(), false);
         getAndInitNewPage(INITIAL_LOG_SIZE);
-        logFlusher = new LogFlusher(this, emptyQ, flushQ);
+        logFlusher = new LogFlusher(this, emptyQ, flushQ, stashQ);
         futureLogFlusher = txnSubsystem.getAsterixAppRuntimeContextProvider().getThreadExecutor().submit(logFlusher);
         if (!flushLogsLogger.isAlive()) {
             txnSubsystem.getAsterixAppRuntimeContextProvider().getThreadExecutor().execute(flushLogsLogger);
@@ -182,8 +184,18 @@
 
     protected void getAndInitNewPage(int logSize) {
         if (logSize > logPageSize) {
+            // before creating a new page, we need to stash a normal sized page since our
queues have fixed capacity
+            appendPage = null;
+            while (appendPage == null) {
+                try {
+                    appendPage = emptyQ.take();
+                    stashQ.add(appendPage);
+                } catch (InterruptedException e) {
+                    //ignore
+                }
+            }
             // for now, alloc a new buffer for each large page
-            // TODO: pool large pages
+            // TODO: pool large pages??
             appendPage = new LogBuffer(txnSubsystem, logSize, flushLSN);
             appendPage.setFileChannel(appendChannel);
             flushQ.offer(appendPage);
@@ -299,10 +311,6 @@
                 e.printStackTrace();
             }
         }
-    }
-
-    public MutableLong getFlushLSN() {
-        return flushLSN;
     }
 
     private long initializeLogAnchor(long nextLogFileId) {
@@ -440,7 +448,7 @@
                 }
             });
             if (logFileNames != null && logFileNames.length != 0) {
-                logFileIds = new ArrayList<Long>();
+                logFileIds = new ArrayList<>();
                 for (String fileName : logFileNames) {
                     logFileIds.add(Long.parseLong(fileName.substring(logFilePrefix.length()
+ 1)));
                 }
@@ -621,14 +629,17 @@
     private final LogManager logMgr;//for debugging
     private final LinkedBlockingQueue<LogBuffer> emptyQ;
     private final LinkedBlockingQueue<LogBuffer> flushQ;
+    private final LinkedBlockingQueue<LogBuffer> stashQ;
     private LogBuffer flushPage;
     private final AtomicBoolean isStarted;
     private final AtomicBoolean terminateFlag;
 
-    public LogFlusher(LogManager logMgr, LinkedBlockingQueue<LogBuffer> emptyQ, LinkedBlockingQueue<LogBuffer>
flushQ) {
+    public LogFlusher(LogManager logMgr, LinkedBlockingQueue<LogBuffer> emptyQ, LinkedBlockingQueue<LogBuffer>
flushQ,
+            LinkedBlockingQueue<LogBuffer> stashQ) {
         this.logMgr = logMgr;
         this.emptyQ = emptyQ;
         this.flushQ = flushQ;
+        this.stashQ = stashQ;
         flushPage = null;
         isStarted = new AtomicBoolean(false);
         terminateFlag = new AtomicBoolean(false);
@@ -680,9 +691,7 @@
                     }
                 }
                 flushPage.flush();
-                if (flushPage.getLogPageSize() == logMgr.getLogPageSize()) {
-                    emptyQ.offer(flushPage);
-                }
+                emptyQ.offer(flushPage.getLogPageSize() == logMgr.getLogPageSize() ? flushPage
: stashQ.remove());
             }
         } catch (Exception e) {
             if (LOGGER.isLoggable(Level.INFO)) {

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1342
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: If6427576a31090a057ee6a3d25e35eef5cdd86f8
Gerrit-PatchSet: 5
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamousaa@gmail.com>
Gerrit-Reviewer: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mblow@apache.org>
Gerrit-Reviewer: abdullah alamoudi <bamousaa@gmail.com>

Mime
View raw message