asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "abdullah alamoudi (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: [NO ISSUE][STO] Perform IO reads in uninterruptible threads
Date Tue, 13 Feb 2018 06:21:00 GMT
abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2387

Change subject: [NO ISSUE][STO] Perform IO reads in uninterruptible threads
......................................................................

[NO ISSUE][STO] Perform IO reads in uninterruptible threads

Change-Id: Id28d57a222f42962284b24296cb9b91658e5dc77
---
M hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexCursor.java
M hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
M hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
3 files changed, 183 insertions(+), 44 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/87/2387/1

diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexCursor.java
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexCursor.java
index f704921..00c5dce 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexCursor.java
@@ -26,14 +26,20 @@
 /**
  * Represents an index cursor. The expected use
  * cursor = new cursor();
- * while (more predicates){
- * -cursor.open(predicate);
- * -while (cursor.hasNext()){
- * --cursor.next()
+ * try{
+ * -while (more predicates){
+ * --cursor.open(predicate);
+ * --try{
+ * ---while (cursor.hasNext()){
+ * ----cursor.next()
+ * ---}
+ * --} finally{
+ * ---cursor.close();
+ * --}
  * -}
- * -cursor.close();
+ * } finally{
+ * -cursor.destroy();
  * }
- * cursor.destroy();
  * Each created cursor must have destroy called
  * Each successfully opened cursor must have close called
  *
@@ -47,7 +53,8 @@
  * When a cursor object is created, it is in the CLOSED state.
  * CLOSED: Legal calls are open() --> OPENED, or destroy() --> DESTROYED, close() -->
no effect
  * OPENED: The only legal calls are hasNext(), next(), or close() --> CLOSED.
- * DESTROYED: All calls are illegal.
+ * DESTROYED: The only legal call is destroy() which has no effect.
+ *
  * Cursors must enforce the cursor state machine
  */
 public interface IIndexCursor extends IDestroyable {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index 1443bbc..4bc57fd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -28,11 +28,14 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -45,8 +48,10 @@
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.hyracks.api.replication.IIOReplicationManager;
 import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.storage.common.buffercache.CachedPage.State;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
 import org.apache.hyracks.storage.common.file.IFileMapManager;
+import org.apache.hyracks.util.InvokeUtil;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -55,6 +60,7 @@
 
     private static final Logger LOGGER = LogManager.getLogger();
     private static final int MAP_FACTOR = 3;
+    private static final CachedPage POISON_PILL = new CachedPage();
 
     private static final int MIN_CLEANED_COUNT_DIFF = 3;
     private static final int PIN_MAX_WAIT_TIME = 50;
@@ -66,7 +72,8 @@
 
     private final int pageSize;
     private final int maxOpenFiles;
-    final IIOManager ioManager;
+    private final ExecutorService executor;
+    private final IIOManager ioManager;
     private final CacheBucket[] pageMap;
     private final IPageReplacementStrategy pageReplacementStrategy;
     private final IPageCleanerPolicy pageCleanerPolicy;
@@ -75,6 +82,7 @@
     private final Map<Integer, BufferedFileHandle> fileInfoMap;
     private final AsyncFIFOPageQueueManager fifoWriter;
     private final Queue<BufferCacheHeaderHelper> headerPageCache = new ConcurrentLinkedQueue<>();
+    private final BlockingQueue<CachedPage> readRequests;
 
     //DEBUG
     private Level fileOpsLevel = Level.DEBUG;
@@ -103,19 +111,43 @@
         this.pageReplacementStrategy = pageReplacementStrategy;
         this.pageCleanerPolicy = pageCleanerPolicy;
         this.fileMapManager = fileMapManager;
-
-        Executor executor = Executors.newCachedThreadPool(threadFactory);
-        fileInfoMap = new HashMap<>();
-        cleanerThread = new CleanerThread();
-        executor.execute(cleanerThread);
-        closed = false;
-
-        fifoWriter = new AsyncFIFOPageQueueManager(this);
-        if (DEBUG) {
-            confiscatedPages = new ArrayList<>();
-            confiscatedPagesOwner = new HashMap<>();
-            confiscateLock = new ReentrantLock();
-            pinnedPageOwner = new ConcurrentHashMap<>();
+        int numReaders = ioManager.getIODevices().size() * 2;
+        readRequests = new ArrayBlockingQueue<>(pageReplacementStrategy.getMaxAllowedNumPages());
+        executor = Executors.newFixedThreadPool(numReaders + 1, threadFactory);
+        try {
+            fileInfoMap = new HashMap<>();
+            cleanerThread = new CleanerThread();
+            executor.execute(cleanerThread);
+            for (int i = 0; i < numReaders; i++) {
+                executor.execute(new ReaderThread(i));
+            }
+            closed = false;
+            fifoWriter = new AsyncFIFOPageQueueManager(this);
+            if (DEBUG) {
+                confiscatedPages = new ArrayList<>();
+                confiscatedPagesOwner = new HashMap<>();
+                confiscateLock = new ReentrantLock();
+                pinnedPageOwner = new ConcurrentHashMap<>();
+            }
+        } catch (Throwable th) {
+            try {
+                throw th;
+            } finally {
+                readRequests.offer(POISON_PILL); // NOSONAR will always succeed since the
queue is empty
+                executor.shutdown();
+                try {
+                    if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+                        LOGGER.log(Level.WARN, "Failure shutting down buffer cache executor
service");
+                    }
+                } catch (InterruptedException e) {
+                    LOGGER.log(Level.WARN, "Interrupted while shutting down buffer cache
executor service");
+                    Thread.currentThread().interrupt();
+                    th.addSuppressed(e);
+                } catch (Throwable e) {
+                    LOGGER.log(Level.WARN, "Failure shutting down buffer cache executor service",
e);
+                    th.addSuppressed(e);
+                }
+            }
         }
     }
 
@@ -185,22 +217,22 @@
             // Resolve race of multiple threads trying to read the page from
             // disk.
             synchronized (cPage) {
-                if (!cPage.valid) {
+                if (cPage.state != State.VALID) {
                     try {
-                        tryRead(cPage);
-                        cPage.valid = true;
-                    } catch (Exception e) {
-                        LOGGER.log(Level.WARN, "Failure while trying to read a page from
disk", e);
-                        throw e;
-                    } finally {
-                        if (!cPage.valid) {
-                            unpin(cPage);
+                        if (cPage.state == State.INVALID) {
+                            // submit request to read
+                            cPage.state = State.READ_REQUESTED;
+                            readRequests.put(cPage);
                         }
+                        cPage.awaitRead();
+                    } catch (Throwable th) {
+                        unpin(cPage);
+                        throw HyracksDataException.create(th);
                     }
                 }
             }
         } else {
-            cPage.valid = true;
+            cPage.state = State.VALID;
         }
         pageReplacementStrategy.notifyCachePageAccess(cPage);
         if (DEBUG) {
@@ -449,7 +481,7 @@
                     buffer.append("      ").append(cp.cpid).append(" -> [")
                             .append(BufferedFileHandle.getFileId(cp.dpid)).append(':')
                             .append(BufferedFileHandle.getPageId(cp.dpid)).append(", ").append(cp.pinCount.get())
-                            .append(", ").append(cp.valid ? "valid" : "invalid").append(",
")
+                            .append(", ").append(cp.state).append(", ")
                             .append(cp.confiscated.get() ? "confiscated" : "physical").append(",
")
                             .append(cp.dirty.get() ? "dirty" : "clean").append("]\n");
                     cp = cp.next;
@@ -480,7 +512,7 @@
                 if (c.confiscated() || c.latch.getReadLockCount() != 0 || c.latch.getWriteHoldCount()
!= 0) {
                     return false;
                 }
-                if (c.valid) {
+                if (c.state == State.VALID) {
                     reachableDpids.add(c.dpid);
                 }
             }
@@ -670,6 +702,55 @@
         }
     }
 
+    private class ReaderThread implements Runnable {
+        private final int num;
+
+        private ReaderThread(int num) {
+            this.num = num;
+        }
+
+        @Override
+        public void run() {
+            while (true) {
+                CachedPage next;
+                try {
+                    next = readRequests.take();
+                } catch (InterruptedException e) {
+                    LOGGER.log(Level.WARN,
+                            "Reader thread - " + num + " was interrupted. Reader threads
should never be interrupted");
+                    Thread.currentThread().interrupt();
+                    break;
+                }
+                if (next == POISON_PILL) {
+                    LOGGER.log(Level.INFO, "Exiting BufferCache reader thread - " + num);
+                    InvokeUtil.doUninterruptibly(() -> readRequests.put(POISON_PILL));
+                    if (Thread.currentThread().isInterrupted()) {
+                        LOGGER.log(Level.ERROR, "Reader thread - " + num
+                                + " was interrupted. Reader threads should never be interrupted");
+                    }
+                    break;
+                }
+                if (next.state != State.READ_REQUESTED) {
+                    LOGGER.log(Level.ERROR,
+                            "Exiting BufferCache reader thread - " + num + ". Took a page
with state = " + next.state);
+                    break;
+                }
+                try {
+                    tryRead(next);
+                    next.state = State.VALID;
+                } catch (HyracksDataException e) {
+                    next.readFailure = e;
+                    next.state = State.READ_FAILED;
+                    LOGGER.log(Level.WARN, "Reader thread - " + num + " failed to read a
page", e);
+                }
+                synchronized (next) {
+                    next.notifyAll();
+                }
+            }
+        }
+
+    }
+
     private class CleanerThread implements Runnable {
         private volatile boolean shutdownStart = false;
         private volatile boolean shutdownComplete = false;
@@ -798,6 +879,17 @@
                 }
             });
             fileInfoMap.clear();
+        }
+        InvokeUtil.doUninterruptibly(() -> readRequests.put(POISON_PILL));
+        // NOSONAR will always succeed since the queue is empty
+        executor.shutdown();
+        try {
+            if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+                LOGGER.log(Level.WARN, "Failure shutting down buffer cache executor service");
+            }
+        } catch (InterruptedException e) {
+            LOGGER.log(Level.WARN, "Interrupted while shutting down buffer cache executor
service");
+            Thread.currentThread().interrupt();
         }
     }
 
@@ -1307,17 +1399,21 @@
                 finishQueue();
                 if (cycleCount > MAX_PIN_ATTEMPT_CYCLES) {
                     cycleCount = 0; // suppress warning below
-                    throw new HyracksDataException("Unable to find free page in buffer cache
after "
-                            + MAX_PIN_ATTEMPT_CYCLES + " cycles (buffer cache undersized?)"
-                            + (DEBUG ? " ; " + (masterPinCount.get() - startingPinCount)
-                                    + " successful pins since start of cycle" : ""));
+                    throw new HyracksDataException(
+                            "Unable to find free page in buffer cache after " + MAX_PIN_ATTEMPT_CYCLES
+                                    + " cycles (buffer cache undersized?)" + (DEBUG
+                                            ? " ; " + (masterPinCount.get() - startingPinCount)
+                                                    + " successful pins since start of cycle"
+                                            : ""));
                 }
             }
         } finally {
             if (cycleCount > PIN_ATTEMPT_CYCLES_WARNING_THRESHOLD && LOGGER.isWarnEnabled())
{
                 LOGGER.warn("Took " + cycleCount + " cycles to find free page in buffer cache.
 (buffer cache "
-                        + "undersized?)" + (DEBUG ? " ; " + (masterPinCount.get() - startingPinCount)
-                                + " successful pins since start of cycle" : ""));
+                        + "undersized?)" + (DEBUG
+                                ? " ; " + (masterPinCount.get() - startingPinCount)
+                                        + " successful pins since start of cycle"
+                                : ""));
             }
         }
     }
@@ -1343,7 +1439,7 @@
             }
             try {
                 cPage.reset(cPage.dpid);
-                cPage.valid = true;
+                cPage.state = State.VALID;
                 cPage.next = bucket.cachedPage;
                 bucket.cachedPage = cPage;
                 cPage.pinCount.decrementAndGet();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
index bc0a04e..d7a55af 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
@@ -23,10 +23,19 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
 /**
  * @author yingyib
  */
 public class CachedPage implements ICachedPageInternal {
+    public enum State {
+        INVALID,
+        READ_REQUESTED,
+        READ_FAILED,
+        VALID
+    }
+
     final int cpid;
     ByteBuffer buffer;
     public final AtomicInteger pinCount;
@@ -36,7 +45,7 @@
     private final IPageReplacementStrategy pageReplacementStrategy;
     volatile long dpid; // disk page id (composed of file id and page id)
     CachedPage next;
-    volatile boolean valid;
+    volatile State state;
     final AtomicBoolean confiscated;
     private IQueueInfo queueInfo;
     private int multiplier;
@@ -44,6 +53,7 @@
     // DEBUG
     private static final boolean DEBUG = false;
     private final StackTraceElement[] ctorStack;
+    Throwable readFailure;
 
     //Constructor for making dummy entry for FIFO queue
     public CachedPage() {
@@ -72,7 +82,7 @@
         latch = new ReentrantReadWriteLock(true);
         replacementStrategyObject = pageReplacementStrategy.createPerPageStrategyObject(cpid);
         dpid = -1;
-        valid = false;
+        state = State.INVALID;
         confiscated = new AtomicBoolean(false);
         queueInfo = null;
         ctorStack = DEBUG ? new Throwable().getStackTrace() : null;
@@ -81,7 +91,7 @@
     public void reset(long dpid) {
         this.dpid = dpid;
         dirty.set(false);
-        valid = false;
+        state = State.INVALID;
         confiscated.set(false);
         pageReplacementStrategy.notifyCachePageReset(this);
         queueInfo = null;
@@ -205,4 +215,30 @@
     public boolean isLargePage() {
         return multiplier > 1;
     }
+
+    /**
+     * Wait for the page requested to be read to complete the read operation
+     * This method is uninterrubtible
+     *
+     * @throws HyracksDataException
+     */
+    public synchronized void awaitRead() throws HyracksDataException {
+        boolean interrupted = false;
+        try {
+            while (state != State.VALID && state != State.READ_FAILED) {
+                try {
+                    wait();
+                } catch (InterruptedException e) {
+                    interrupted = true;
+                }
+            }
+            if (state == State.READ_FAILED) {
+                throw HyracksDataException.create(readFailure);
+            }
+        } finally {
+            if (interrupted) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2387
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Id28d57a222f42962284b24296cb9b91658e5dc77
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamousaa@gmail.com>

Mime
View raw message