drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [1/3] drill git commit: DRILL-2757: Verify operators correctly handle low memory conditions and cancellations
Date Sun, 10 May 2015 16:22:57 GMT
Repository: drill
Updated Branches:
  refs/heads/master a29638363 -> a392e5322


http://git-wip-us.apache.org/repos/asf/drill/blob/1c09c2f1/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 4e348bb..330ec79 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.record;
 import java.util.Iterator;
 
 import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.OutOfMemoryException;
@@ -67,7 +68,9 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator>
implements
   protected static enum BatchState {
     BUILD_SCHEMA, // Need to build schema and return
     FIRST, // This is still the first data batch
-    NOT_FIRST, // The first data batch has alread been returned
+    NOT_FIRST, // The first data batch has already been returned
+    STOP, // The query most likely failed, we need to propagate STOP to the root
+    OUT_OF_MEMORY, // Out of Memory while building the Schema...Ouch!
     DONE // All work is done, no more data to be sent
   }
 
@@ -119,23 +122,21 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator>
implements
   public final IterOutcome next() {
     try {
       stats.startProcessing();
-//      if (state == BatchState.BUILD_SCHEMA) {
-//        buildSchema();
-//        if (state == BatchState.BUILD_SCHEMA.DONE) {
-//          return IterOutcome.NONE;
-//        } else {
-//          state = BatchState.FIRST;
-//          return IterOutcome.OK_NEW_SCHEMA;
-//        }
-//      }
       switch (state) {
         case BUILD_SCHEMA: {
           buildSchema();
-          if (state == BatchState.DONE) {
-            return IterOutcome.NONE;
-          } else {
-            state = BatchState.FIRST;
-            return IterOutcome.OK_NEW_SCHEMA;
+          switch (state) {
+            case DONE:
+              return IterOutcome.NONE;
+            case OUT_OF_MEMORY:
+              // because we don't support schema changes, it is safe to fail the query right
away
+              context.fail(UserException.memoryError().build());
+              // FALL-THROUGH
+            case STOP:
+              return IterOutcome.STOP;
+            default:
+              state = BatchState.FIRST;
+              return IterOutcome.OK_NEW_SCHEMA;
           }
         }
         case DONE: {

http://git-wip-us.apache.org/repos/asf/drill/blob/1c09c2f1/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
index 7c77ca2..bf465c7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.vector;
 
+import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
+
 public class AllocationHelper {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AllocationHelper.class);
 
@@ -44,16 +46,15 @@ public class AllocationHelper {
 
   /**
    * Allocates the exact amount if v is fixed width, otherwise falls back to dynamic allocation
-   * @param v
-   * @param valueCount
-   * @return
+   * @param v value vector we are trying to allocate
+   * @param valueCount  size we are trying to allocate
+   * @throws org.apache.drill.exec.memory.OutOfMemoryRuntimeException if it can't allocate
the memory
    */
-  public static boolean allocateNew(ValueVector v, int valueCount){
+  public static void allocateNew(ValueVector v, int valueCount) {
     if (v instanceof  FixedWidthVector) {
       ((FixedWidthVector) v).allocateNew(valueCount);
-      return true;
     } else {
-      return v.allocateNewSafe();
+      v.allocateNew();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/1c09c2f1/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
index 2fc5bf3..ae5fad5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
@@ -96,10 +96,12 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
 
     clear();
     int valueSize = getSizeFromCount(allocationValueCount);
-    data = allocator.buffer(valueSize);
-    if (data == null) {
+    DrillBuf newBuf = allocator.buffer(valueSize);
+    if (newBuf == null) {
       return false;
     }
+
+    data = newBuf;
     zeroVector();
     return true;
   }
@@ -113,7 +115,12 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
   public void allocateNew(int valueCount) {
     clear();
     int valueSize = getSizeFromCount(valueCount);
-    data = allocator.buffer(valueSize);
+    DrillBuf newBuf = allocator.buffer(valueSize);
+    if (newBuf == null) {
+      throw new OutOfMemoryRuntimeException(String.format("Failure while allocating buffer
of d% bytes.", valueSize));
+    }
+
+    data = newBuf;
     zeroVector();
   }
 
@@ -122,7 +129,12 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
    */
   public void reAlloc() {
     allocationValueCount *= 2;
-    DrillBuf newBuf = allocator.buffer(getSizeFromCount(allocationValueCount));
+    int valueSize = getSizeFromCount(allocationValueCount);
+    DrillBuf newBuf = allocator.buffer(valueSize);
+    if (newBuf == null) {
+      throw new OutOfMemoryRuntimeException(String.format("Failure while allocating buffer
of %d bytes.", valueSize));
+    }
+
     newBuf.setZero(0, newBuf.capacity());
     newBuf.setBytes(0, data, 0, data.capacity());
     data.release();

http://git-wip-us.apache.org/repos/asf/drill/blob/1c09c2f1/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index b7ef584..7b36e21 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -42,6 +42,8 @@ import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.coord.DistributedSemaphore;
 import org.apache.drill.exec.coord.DistributedSemaphore.DistributedLease;
 import org.apache.drill.exec.exception.OptimizerException;
+import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.opt.BasicOptimizer;
@@ -222,6 +224,8 @@ public class Foreman implements Runnable {
         throw new IllegalStateException();
       }
       injector.injectChecked(queryContext.getExecutionControls(), "run-try-end", ForemanException.class);
+    } catch (final OutOfMemoryException | OutOfMemoryRuntimeException e) {
+      moveToState(QueryState.FAILED, UserException.memoryError(e).build());
     } catch (final ForemanException e) {
       moveToState(QueryState.FAILED, e);
     } catch (AssertionError | Exception ex) {

http://git-wip-us.apache.org/repos/asf/drill/blob/1c09c2f1/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index ddb828c..dc83cc6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -207,9 +207,7 @@ public class FragmentExecutor implements Runnable {
       updateState(FragmentState.FINISHED);
     } catch (OutOfMemoryError | OutOfMemoryRuntimeException e) {
       if (!(e instanceof OutOfMemoryError) || "Direct buffer memory".equals(e.getMessage()))
{
-        fail(UserException.resourceError(e)
-            .message("One or more nodes ran out of memory while executing the query.")
-            .build());
+        fail(UserException.memoryError(e).build());
       } else {
         // we have a heap out of memory error. The JVM in unstable, exit.
         System.err.println("Node ran out of Heap memory, exiting.");

http://git-wip-us.apache.org/repos/asf/drill/blob/1c09c2f1/exec/java-exec/src/test/java/org/apache/drill/TestOutOfMemoryOutcome.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestOutOfMemoryOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/TestOutOfMemoryOutcome.java
new file mode 100644
index 0000000..b270a8b
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestOutOfMemoryOutcome.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
+import org.apache.drill.exec.testing.ControlsInjectionUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Run several tpch queries and inject an OutOfMemoryException in ScanBatch that will cause
an OUT_OF_MEMORY outcome to
+ * be propagated downstream. Make sure the proper "memory error" message is sent to the client.
+ */
+public class TestOutOfMemoryOutcome extends BaseTestQuery{
+
+  private static final String SINGLE_MODE = "ALTER SESSION SET `planner.disable_exchanges`
= true";
+
+  private void testSingleMode(String fileName) throws Exception{
+    test(SINGLE_MODE);
+
+    CoordinationProtos.DrillbitEndpoint endpoint = bits[0].getContext().getEndpoint();
+    String controlsString = "{\"injections\":[{"
+      + "\"address\":\"" + endpoint.getAddress() + "\","
+      + "\"port\":\"" + endpoint.getUserPort() + "\","
+      + "\"type\":\"exception\","
+      + "\"siteClass\":\"" + "org.apache.drill.exec.physical.impl.ScanBatch" + "\","
+      + "\"desc\":\"" + "next-allocate" + "\","
+      + "\"nSkip\":0,"
+      + "\"nFire\":1,"
+      + "\"exceptionClass\":\"" + "org.apache.drill.exec.memory.OutOfMemoryException" + "\""
+      + "}]}";
+    ControlsInjectionUtil.setControls(client, controlsString);
+
+    String query = getFile(fileName);
+
+    try {
+      test(query);
+    } catch(UserException uex) {
+      DrillPBError error = uex.getOrCreatePBError(false);
+      Assert.assertEquals(DrillPBError.ErrorType.RESOURCE, error.getErrorType());
+      Assert.assertTrue("Error message isn't related to memory error",
+        uex.getMessage().contains(UserException.MEMORY_ERROR_MSG));
+    }
+  }
+
+  @Test
+  public void tpch01() throws Exception{
+    testSingleMode("queries/tpch/01.sql");
+  }
+
+  @Test
+  public void tpch03() throws Exception{
+    testSingleMode("queries/tpch/03.sql");
+  }
+
+  @Test
+  public void tpch04() throws Exception{
+    testSingleMode("queries/tpch/04.sql");
+  }
+
+  @Test
+  public void tpch05() throws Exception{
+    testSingleMode("queries/tpch/05.sql");
+  }
+
+  @Test
+  public void tpch06() throws Exception{
+    testSingleMode("queries/tpch/06.sql");
+  }
+
+  @Test
+  public void tpch07() throws Exception{
+    testSingleMode("queries/tpch/07.sql");
+  }
+
+  @Test
+  public void tpch08() throws Exception{
+    testSingleMode("queries/tpch/08.sql");
+  }
+
+  @Test
+  public void tpch09() throws Exception{
+    testSingleMode("queries/tpch/09.sql");
+  }
+
+  @Test
+  public void tpch10() throws Exception{
+    testSingleMode("queries/tpch/10.sql");
+  }
+
+  @Test
+  public void tpch12() throws Exception{
+    testSingleMode("queries/tpch/12.sql");
+  }
+
+  @Test
+  public void tpch13() throws Exception{
+    testSingleMode("queries/tpch/13.sql");
+  }
+
+  @Test
+  public void tpch14() throws Exception{
+    testSingleMode("queries/tpch/14.sql");
+  }
+
+  @Test
+  public void tpch18() throws Exception{
+    testSingleMode("queries/tpch/18.sql");
+  }
+
+  @Test
+  public void tpch20() throws Exception{
+    testSingleMode("queries/tpch/20.sql");
+  }
+}


Mime
View raw message