drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [3/6] drill git commit: DRILL-1813: Fix race condition in UnlimitedRawBatchBuffer
Date Fri, 05 Dec 2014 18:14:18 GMT
DRILL-1813: Fix race condition in UnlimitedRawBatchBuffer

Also change propagated exception to IOException instead of RuntimeException.

Don't mark a fragment as failed if it has been cancelled.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/0ed30a82
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/0ed30a82
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/0ed30a82

Branch: refs/heads/0.7.0
Commit: 0ed30a8245673c63588302698ac69aeb2e15e322
Parents: d15ba12
Author: Steven Phillips <sphillips@maprtech.com>
Authored: Thu Dec 4 21:21:21 2014 -0800
Committer: Steven Phillips <sphillips@maprtech.com>
Committed: Fri Dec 5 04:30:46 2014 -0800

----------------------------------------------------------------------
 .../apache/drill/exec/ops/FragmentContext.java  | 30 +++++++++++++++-----
 .../exec/rpc/data/DataResponseHandlerImpl.java  |  7 ++++-
 .../apache/drill/exec/rpc/data/DataServer.java  |  2 +-
 .../drill/exec/work/batch/IncomingBuffers.java  | 21 ++++----------
 .../work/batch/UnlimitedRawBatchBuffer.java     |  9 ++++--
 .../apache/drill/exec/work/foreman/Foreman.java |  2 +-
 .../exec/work/fragment/FragmentExecutor.java    |  2 ++
 .../exec/work/fragment/FragmentManager.java     |  6 ++--
 .../work/fragment/NonRootFragmentManager.java   |  2 +-
 .../exec/work/fragment/RootFragmentManager.java |  3 +-
 10 files changed, 53 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/0ed30a82/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 0b99fc4..dc47f4e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -29,7 +29,6 @@ import net.hydromatic.optiq.jdbc.SimpleOptiqSchema;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.compile.ClassTransformer;
 import org.apache.drill.exec.compile.QueryClassLoader;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.expr.ClassGenerator;
@@ -79,8 +78,13 @@ public class FragmentContext implements Closeable {
   private LongObjectOpenHashMap<DrillBuf> managedBuffers = new LongObjectOpenHashMap<>();
 
   private volatile Throwable failureCause;
-  private volatile boolean failed = false;
-  private volatile boolean cancelled = false;
+  private volatile FragmentContextState state = FragmentContextState.OK;
+
+  private static enum FragmentContextState {
+    OK,
+    FAILED,
+    CANCELED
+  }
 
   public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection
connection,
       FunctionImplementationRegistry funcRegistry) throws OutOfMemoryException, ExecutionSetupException
{
@@ -127,12 +131,24 @@ public class FragmentContext implements Closeable {
 
   public void fail(Throwable cause) {
     logger.error("Fragment Context received failure.", cause);
-    failed = true;
+    setState(FragmentContextState.FAILED);
     failureCause = cause;
   }
 
   public void cancel() {
-    cancelled = true;
+    setState(FragmentContextState.CANCELED);
+  }
+
+  /**
+   * Allowed transitions from left to right: OK -> FAILED -> CANCELED
+   * @param newState
+   */
+  private synchronized void setState(FragmentContextState newState) {
+    if (state == FragmentContextState.OK) {
+      state = newState;
+    } else if (newState == FragmentContextState.CANCELED) {
+      state = newState;
+    }
   }
 
   public DrillbitContext getDrillbitContext() {
@@ -251,11 +267,11 @@ public class FragmentContext implements Closeable {
   }
 
   public boolean isFailed() {
-    return failed;
+    return state == FragmentContextState.FAILED;
   }
 
   public boolean isCancelled() {
-    return cancelled;
+    return state == FragmentContextState.CANCELED;
   }
 
   public FunctionImplementationRegistry getFunctionRegistry() {

http://git-wip-us.apache.org/repos/asf/drill/blob/0ed30a82/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java
index c37550f..1fcb3e9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java
@@ -31,6 +31,8 @@ import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.work.WorkManager.WorkerBee;
 import org.apache.drill.exec.work.fragment.FragmentManager;
 
+import java.io.IOException;
+
 public class DataResponseHandlerImpl implements DataResponseHandler{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataResponseHandlerImpl.class);
 
@@ -67,5 +69,8 @@ public class DataResponseHandlerImpl implements DataResponseHandler{
     } catch (FragmentSetupException e) {
       logger.error("Failure while attempting to setup new fragment.", e);
       sender.send(new Response(RpcType.ACK, Acks.FAIL));
+    } catch (IOException e) {
+      throw new RpcException(e);
     }
-  }}
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/0ed30a82/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
index 4cbc8fb..e88455b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
@@ -115,7 +115,7 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection>
{
         }
       }else{
         BufferAllocator allocator = manager.getFragmentContext().getAllocator();
-        if (body != null) {
+        if (body != null && !manager.getFragmentContext().isCancelled()) {
           if (!allocator.takeOwnership((DrillBuf) body.unwrap())) {
             dataHandler.handle(connection, manager, OOM_FRAGMENT, null, null);
           }

http://git-wip-us.apache.org/repos/asf/drill/blob/0ed30a82/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
index 5fa9ce0..b0206f7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
@@ -61,16 +61,12 @@ public class IncomingBuffers implements AutoCloseable {
     streamsRemaining.set(totalStreams);
   }
 
-  public boolean batchArrived(RawFragmentBatch batch) throws FragmentSetupException {
+  public boolean batchArrived(RawFragmentBatch batch) throws FragmentSetupException, IOException
{
     // no need to do anything if we've already enabled running.
     // logger.debug("New Batch Arrived {}", batch);
     if (batch.getHeader().getIsOutOfMemory()) {
       for (DataCollector fSet : fragCounts.values()) {
-        try {
-          fSet.batchArrived(0, batch);
-        } catch (IOException e) {
-          throw new RuntimeException();
-        }
+        fSet.batchArrived(0, batch);
       }
       return false;
     }
@@ -82,15 +78,10 @@ public class IncomingBuffers implements AutoCloseable {
     if (fSet == null) {
       throw new FragmentSetupException(String.format("We received a major fragment id that
we were not expecting.  The id was %d. %s", sendMajorFragmentId, Arrays.toString(fragCounts.values().toArray())));
     }
-    try {
-      synchronized (this) {
-        boolean decremented = fSet.batchArrived(batch.getHeader().getSendingMinorFragmentId(),
batch);
-
-        // we should only return true if remaining required has been decremented and is currently
equal to zero.
-        return decremented && remainingRequired.get() == 0;
-      }
-    } catch (IOException e) {
-      throw new FragmentSetupException(e);
+    synchronized (this) {
+      boolean decremented = fSet.batchArrived(batch.getHeader().getSendingMinorFragmentId(),
batch);
+      // we should only return true if remaining required has been decremented and is currently
equal to zero.
+      return decremented && remainingRequired.get() == 0;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/0ed30a82/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index 722ced0..623a719 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.work.batch;
 
+import java.io.IOException;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -59,12 +60,16 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
   }
 
   @Override
-  public void enqueue(RawFragmentBatch batch) {
+  public void enqueue(RawFragmentBatch batch) throws IOException {
     if (state == BufferState.KILLED) {
       batch.release();
     }
     if (isFinished()) {
-      throw new RuntimeException("Attempted to enqueue batch after finished");
+      if (state == BufferState.KILLED) {
+        batch.release();
+      } else {
+        throw new IOException("Attempted to enqueue batch after finished");
+      }
     }
     if (batch.getHeader().getIsOutOfMemory()) {
       logger.debug("Setting autoread false");

http://git-wip-us.apache.org/repos/asf/drill/blob/0ed30a82/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index c26a08f..5efc9fa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -191,7 +191,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>
{
       moveToState(QueryState.FAILED, e);
 
     } catch (AssertionError | Exception ex) {
-      moveToState(QueryState.FAILED, new ForemanException("Unexpected exception during fragment
initialization.", ex));
+      moveToState(QueryState.FAILED, new ForemanException("Unexpected exception during fragment
initialization: " + ex.getMessage(), ex));
 
     } catch (OutOfMemoryError e) {
       System.out.println("Out of memory, exiting.");

http://git-wip-us.apache.org/repos/asf/drill/blob/0ed30a82/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 2a392d1..27038d3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -78,6 +78,8 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
   }
 
   public void receivingFragmentFinished(FragmentHandle handle) {
+    updateState(FragmentState.CANCELLED);
+    context.cancel();
     root.receivingFragmentFinished(handle);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/0ed30a82/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
index 2ff2ed4..7a819c4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
@@ -23,6 +23,8 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.record.RawFragmentBatch;
 import org.apache.drill.exec.rpc.RemoteConnection;
 
+import java.io.IOException;
+
 /**
  * The Fragment Manager is responsible managing incoming data and executing a fragment. Once
enough data and resources
  * are avialable, a fragment manager will start a fragment executor to run the associated
fragment.
@@ -34,9 +36,9 @@ public interface FragmentManager {
    *
    * @param batch
    * @return True if the fragment has enough incoming data to be able to be run.
-   * @throws FragmentSetupException
+   * @throws FragmentSetupException, IOException
    */
-  public abstract boolean handle(RawFragmentBatch batch) throws FragmentSetupException;
+  public abstract boolean handle(RawFragmentBatch batch) throws FragmentSetupException, IOException;
 
   /**
    * Get the fragment runner for this incoming fragment. Note, this can only be requested
once.

http://git-wip-us.apache.org/repos/asf/drill/blob/0ed30a82/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
index 312f96a..3671804 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
@@ -68,7 +68,7 @@ public class NonRootFragmentManager implements FragmentManager {
    * @see org.apache.drill.exec.work.fragment.FragmentHandler#handle(org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle,
org.apache.drill.exec.record.RawFragmentBatch)
    */
   @Override
-  public boolean handle(RawFragmentBatch batch) throws FragmentSetupException {
+  public boolean handle(RawFragmentBatch batch) throws FragmentSetupException, IOException
{
     return buffers.batchArrived(batch);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/0ed30a82/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
index 75dd923..54fc8c4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.work.fragment;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 
@@ -44,7 +45,7 @@ public class RootFragmentManager implements FragmentManager{
   }
 
   @Override
-  public boolean handle(RawFragmentBatch batch) throws FragmentSetupException {
+  public boolean handle(RawFragmentBatch batch) throws FragmentSetupException, IOException
{
     return buffers.batchArrived(batch);
   }
 


Mime
View raw message