asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Michael Blow (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: ASTERIXDB-1883: FeedRuntimeInputHandler issues
Date Wed, 12 Apr 2017 16:43:16 GMT
Michael Blow has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1677

Change subject: ASTERIXDB-1883: FeedRuntimeInputHandler issues
......................................................................

ASTERIXDB-1883: FeedRuntimeInputHandler issues

Recent commit https://asterix-gerrit.ics.uci.edu/#/c/1591/ includes a
number of new issues in FeedRuntimeInputHandler:
- hangs caused by race condition with mutex & inbox on close (observed
  on Jenkins)
- CPU spin on disk spilling on empty inbox
- The writer is not flushed in as many cases as before

Change-Id: I7e091f65eb5f3a76277803b3197d490d3ef2fc04
---
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
1 file changed, 70 insertions(+), 132 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/77/1677/1

diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
index 329451d..2c549ad 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.external.feed.dataflow;
 
 import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -51,7 +52,9 @@
     private static final Logger LOGGER = Logger.getLogger(FeedRuntimeInputHandler.class.getName());
     private static final double MAX_SPILL_USED_BEFORE_RESUME = 0.8;
     private static final boolean DEBUG = false;
-    private final Object mutex = new Object();
+    private static final ByteBuffer POISON_PILL = ByteBuffer.allocate(0);
+    private static final ByteBuffer SPILLED = ByteBuffer.allocate(0);
+
     private final FeedExceptionHandler exceptionHandler;
     private final FrameSpiller spiller;
     private final FeedPolicyAccessor fpa;
@@ -59,7 +62,7 @@
     private final int initialFrameSize;
     private final FrameTransporter consumer;
     private final Thread consumerThread;
-    private final LinkedBlockingQueue<ByteBuffer> inbox;
+    private final BlockingQueue<ByteBuffer> inbox;
     private final ConcurrentFramePool framePool;
     private Mode mode = Mode.PROCESS;
     private int total = 0;
@@ -110,28 +113,23 @@
             // If we use nextframe, chances are this frame will also be
             // flushed into the spilled file. This causes problem when trying to
             // read the frame and the size info is lost.
-            inbox.put(ByteBuffer.allocate(0));
-            synchronized (mutex) {
-                if (DEBUG) {
-                    LOGGER.info("Producer is waking up consumer");
-                }
-                mutex.notify();
-            }
+            inbox.put(POISON_PILL);
             consumerThread.join();
         } catch (InterruptedException e) {
-            LOGGER.log(Level.WARNING, e.getMessage(), e);
+            LOGGER.log(Level.WARNING, "interrupted", e);
+            Thread.currentThread().interrupt();
         }
         try {
             framePool.release(inbox);
         } catch (Throwable th) {
-            LOGGER.log(Level.WARNING, th.getMessage(), th);
+            LOGGER.log(Level.WARNING, "exception releasing buffers", th);
         }
         try {
             if (spiller != null) {
                 spiller.close();
             }
         } catch (Throwable th) {
-            LOGGER.log(Level.WARNING, th.getMessage(), th);
+            LOGGER.log(Level.WARNING, "exception closing spiller", th);
         } finally {
             writer.close();
         }
@@ -163,8 +161,11 @@
                     }
                     break;
             }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw HyracksDataException.create(e);
         } catch (Throwable th) {
-            throw new HyracksDataException(th);
+            throw HyracksDataException.create(th);
         }
     }
 
@@ -182,7 +183,7 @@
         }
     }
 
-    private void discard(ByteBuffer frame) throws HyracksDataException {
+    private void discard(ByteBuffer frame) throws HyracksDataException, InterruptedException
{
         if (DEBUG) {
             LOGGER.info("starting discard(frame)");
         }
@@ -206,7 +207,7 @@
                 }
                 numProcessedInMemory++;
                 next.put(frame);
-                inbox.offer(next);
+                inbox.put(next);
                 mode = Mode.PROCESS;
                 return;
             }
@@ -224,7 +225,7 @@
         }
     }
 
-    private void exitProcessState(ByteBuffer frame) throws HyracksDataException {
+    private void exitProcessState(ByteBuffer frame) throws HyracksDataException, InterruptedException
{
         if (fpa.spillToDiskOnCongestion()) {
             mode = Mode.SPILL;
             spiller.open();
@@ -237,7 +238,7 @@
         }
     }
 
-    private void discardOrStall(ByteBuffer frame) throws HyracksDataException {
+    private void discardOrStall(ByteBuffer frame) throws HyracksDataException, InterruptedException
{
         if (fpa.discardOnCongestion()) {
             mode = Mode.DISCARD;
             discard(frame);
@@ -249,48 +250,33 @@
         }
     }
 
-    private void stall(ByteBuffer frame) throws HyracksDataException {
-        try {
+    private void stall(ByteBuffer frame) throws HyracksDataException, InterruptedException
{
+        if (DEBUG) {
+            LOGGER.info("in stall(frame). So far, I have stalled " + numStalled);
+        }
+        numStalled++;
+        // If spilling is enabled, we wait on the spiller
+        if (fpa.spillToDiskOnCongestion()) {
             if (DEBUG) {
-                LOGGER.info("in stall(frame). So far, I have stalled " + numStalled);
+                LOGGER.info("in stall(frame). Spilling is enabled so we will attempt to spill");
             }
-            numStalled++;
-            // If spilling is enabled, we wait on the spiller
-            if (fpa.spillToDiskOnCongestion()) {
-                if (DEBUG) {
-                    LOGGER.info("in stall(frame). Spilling is enabled so we will attempt
to spill");
-                }
-                waitforSpillSpace();
-                spiller.spill(frame);
-                numSpilled++;
-                synchronized (mutex) {
-                    if (DEBUG) {
-                        LOGGER.info("Producer is waking up consumer");
-                    }
-                    mutex.notify();
-                }
-                return;
-            }
-            if (DEBUG) {
-                LOGGER.info("in stall(frame). Spilling is disabled. We will subscribe to
frame pool");
-            }
-            // Spilling is disabled, we subscribe to feedMemoryManager
-            frameAction.setFrame(frame);
-            framePool.subscribe(frameAction);
-            ByteBuffer temp = frameAction.retrieve();
-            inbox.put(temp);
-            numProcessedInMemory++;
-            if (DEBUG) {
-                LOGGER.info("stall(frame) has been completed. Notifying the consumer that
a frame is ready");
-            }
-            synchronized (mutex) {
-                if (DEBUG) {
-                    LOGGER.info("Producer is waking up consumer");
-                }
-                mutex.notify();
-            }
-        } catch (InterruptedException e) {
-            throw new HyracksDataException(e);
+            waitforSpillSpace();
+            spiller.spill(frame);
+            numSpilled++;
+            inbox.put(SPILLED);
+            return;
+        }
+        if (DEBUG) {
+            LOGGER.info("in stall(frame). Spilling is disabled. We will subscribe to frame
pool");
+        }
+        // Spilling is disabled, we subscribe to feedMemoryManager
+        frameAction.setFrame(frame);
+        framePool.subscribe(frameAction);
+        ByteBuffer temp = frameAction.retrieve();
+        inbox.put(temp);
+        numProcessedInMemory++;
+        if (DEBUG) {
+            LOGGER.info("stall(frame) has been completed. Notifying the consumer that a frame
is ready");
         }
     }
 
@@ -307,19 +293,14 @@
         }
     }
 
-    private void process(ByteBuffer frame) throws HyracksDataException {
+    private void process(ByteBuffer frame) throws HyracksDataException, InterruptedException
{
         // Get a page from frame pool
         ByteBuffer next = (frame.capacity() <= framePool.getMaxFrameSize()) ? getFreeBuffer(frame.capacity())
: null;
         if (next != null) {
             // Got a page from memory pool
             numProcessedInMemory++;
             next.put(frame);
-            try {
-                inbox.put(next);
-                notifyMemoryConsumer();
-            } catch (InterruptedException e) {
-                throw new HyracksDataException(e);
-            }
+            inbox.put(next);
         } else {
             if (DEBUG) {
                 LOGGER.info("Couldn't allocate memory --> exitProcessState(frame)");
@@ -329,46 +310,29 @@
         }
     }
 
-    private void notifyMemoryConsumer() {
-        if (inbox.size() == 1) {
-            synchronized (mutex) {
-                if (DEBUG) {
-                    LOGGER.info("Producer is waking up consumer");
-                }
-                mutex.notify();
-            }
-        }
-    }
-
-    private void spill(ByteBuffer frame) throws HyracksDataException {
+    private void spill(ByteBuffer frame) throws HyracksDataException, InterruptedException
{
         if (spiller.switchToMemory()) {
-            synchronized (mutex) {
-                // Check if there is memory
-                ByteBuffer next = null;
-                if (frame.capacity() <= framePool.getMaxFrameSize()) {
-                    next = getFreeBuffer(frame.capacity());
-                }
-                if (next != null) {
-                    spiller.close();
-                    numProcessedInMemory++;
-                    next.put(frame);
-                    inbox.offer(next);
-                    notifyMemoryConsumer();
-                    mode = Mode.PROCESS;
-                } else {
-                    // spill. This will always succeed since spilled = 0 (TODO must verify
that budget can't be 0)
-                    spiller.spill(frame);
-                    numSpilled++;
-                    if (DEBUG) {
-                        LOGGER.info("Producer is waking up consumer");
-                    }
-                    mutex.notify();
-                }
+            // Check if there is memory
+            ByteBuffer next = null;
+            if (frame.capacity() <= framePool.getMaxFrameSize()) {
+                next = getFreeBuffer(frame.capacity());
+            }
+            if (next != null) {
+                spiller.close();
+                numProcessedInMemory++;
+                next.put(frame);
+                inbox.put(next);
+                mode = Mode.PROCESS;
+            } else {
+                // spill. This will always succeed since spilled = 0 (TODO must verify that
budget can't be 0)
+                spiller.spill(frame);
+                numSpilled++;
+                inbox.put(SPILLED);
             }
         } else {
             // try to spill. If failed switch to either discard or stall
             if (spiller.spill(frame)) {
-                notifyDiskConsumer();
+                inbox.put(SPILLED);
                 numSpilled++;
             } else {
                 if (fpa.discardOnCongestion()) {
@@ -377,17 +341,6 @@
                 } else {
                     stall(frame);
                 }
-            }
-        }
-    }
-
-    private void notifyDiskConsumer() {
-        if (spiller.remaining() == 1) {
-            synchronized (mutex) {
-                if (DEBUG) {
-                    LOGGER.info("Producer is waking up consumer");
-                }
-                mutex.notify();
             }
         }
     }
@@ -458,39 +411,24 @@
             try {
                 ByteBuffer frame;
                 boolean running = true;
-                while (running) {
-                    frame = inbox.poll();
+                for (; running; writer.flush()) {
+                    frame = inbox.take();
 
-                    if (frame == null && spiller != null) {
+                    if (frame == SPILLED) {
                         running = clearLocalFrames();
-                        continue;
-                    }
-
-                    if (frame == null) {
-                        synchronized (mutex) {
-                            LOGGER.info("Consumer is going to sleep");
-                            mutex.wait();
-                            LOGGER.info("Consumer is waking up");
-                        }
-                        continue;
-                    }
-
-                    // process
-                    if (frame.capacity() == 0) {
+                    } else if (frame == POISON_PILL) {
                         running = false;
                         if (spiller != null ) {
                             clearLocalFrames();
                         }
                     } else {
+                        // process
                         try {
-                            if (consume(frame) != null) {
-                                return;
-                            }
+                            running = consume(frame) == null;
                         } finally {
                             framePool.release(frame);
                         }
                     }
-                    writer.flush();
                 }
             } catch (Throwable th) {
                 this.cause = th;
@@ -507,7 +445,7 @@
         return total;
     }
 
-    public LinkedBlockingQueue<ByteBuffer> getInternalBuffer() {
+    public BlockingQueue<ByteBuffer> getInternalBuffer() {
         return inbox;
     }
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1677
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I7e091f65eb5f3a76277803b3197d490d3ef2fc04
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Michael Blow <mblow@apache.org>

Mime
View raw message