drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h.@apache.org
Subject [3/3] drill git commit: DRILL-2851: handle oversized allocation requests; ensure flatten splits a batch if data is oversized; add unit tests
Date Mon, 06 Jul 2015 23:28:58 GMT
DRILL-2851: handle oversized allocation requests; ensure flatten splits a batch if data is oversized; add unit tests


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/b2bbd994
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/b2bbd994
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/b2bbd994

Branch: refs/heads/master
Commit: b2bbd9941be6b132a83d27c0ae02c935e1dec5dd
Parents: 0a27a03
Author: Hanifi Gunes <hgunes@maprtech.com>
Authored: Fri Jun 19 22:38:06 2015 -0700
Committer: Hanifi Gunes <hgunes@maprtech.com>
Committed: Mon Jul 6 16:28:29 2015 -0700

----------------------------------------------------------------------
 .../src/main/codegen/includes/vv_imports.ftl    |   2 +
 .../codegen/templates/FixedValueVectors.java    |  68 ++-
 .../codegen/templates/NullableValueVectors.java |  16 +-
 .../templates/VariableLengthVectors.java        |  65 ++-
 .../exception/OversizedAllocationException.java |  51 ++
 .../impl/flatten/FlattenRecordBatch.java        |   4 +-
 .../physical/impl/flatten/FlattenTemplate.java  |  70 +--
 .../exec/physical/impl/flatten/Flattener.java   |   2 +-
 .../drill/exec/vector/BaseDataValueVector.java  |   2 +-
 .../drill/exec/vector/BaseValueVector.java      |   8 +-
 .../org/apache/drill/exec/vector/BitVector.java |  57 +-
 .../exec/record/vector/TestValueVector.java     | 522 +++++++++++--------
 12 files changed, 539 insertions(+), 328 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/b2bbd994/exec/java-exec/src/main/codegen/includes/vv_imports.ftl
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/includes/vv_imports.ftl b/exec/java-exec/src/main/codegen/includes/vv_imports.ftl
index 92c8007..733e0a5 100644
--- a/exec/java-exec/src/main/codegen/includes/vv_imports.ftl
+++ b/exec/java-exec/src/main/codegen/includes/vv_imports.ftl
@@ -30,6 +30,8 @@ import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
 import org.apache.drill.exec.proto.UserBitShared.SerializedField;
 import org.apache.drill.exec.record.*;
 import org.apache.drill.exec.vector.*;
+import org.apache.drill.common.exceptions.*;
+import org.apache.drill.exec.exception.*;
 import org.apache.drill.exec.expr.holders.*;
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.types.TypeProtos.*;

http://git-wip-us.apache.org/repos/asf/drill/blob/b2bbd994/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
index 7103a17..e8a4d5f 100644
--- a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
@@ -48,7 +48,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
   private final Accessor accessor = new Accessor();
   private final Mutator mutator = new Mutator();
 
-  private int allocationValueCount = INITIAL_VALUE_ALLOCATION;
+  private int allocationSizeInBytes = INITIAL_VALUE_ALLOCATION * ${type.width};
   private int allocationMonitor = 0;
 
   public ${minor.class}Vector(MaterializedField field, BufferAllocator allocator) {
@@ -73,8 +73,12 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
   }
 
   @Override
-  public void setInitialCapacity(int numRecords) {
-    allocationValueCount = numRecords;
+  public void setInitialCapacity(final int valueCount) {
+    final long size = 1L * valueCount * ${type.width};
+    if (size > MAX_ALLOCATION_SIZE) {
+      throw new OversizedAllocationException("Requested amount of memory is more than max allowed allocation size");
+    }
+    allocationSizeInBytes = (int)size;
   }
 
   public void allocateNew() {
@@ -84,42 +88,50 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
   }
 
   public boolean allocateNewSafe() {
-    clear();
+    long curAllocationSize = allocationSizeInBytes;
     if (allocationMonitor > 10) {
-      allocationValueCount = Math.max(8, (int) (allocationValueCount / 2));
+      curAllocationSize = Math.max(8, curAllocationSize / 2);
       allocationMonitor = 0;
     } else if (allocationMonitor < -2) {
-      allocationValueCount = (int) (allocationValueCount * 2);
+      curAllocationSize = allocationSizeInBytes * 2L;
       allocationMonitor = 0;
     }
 
-    DrillBuf newBuf = allocator.buffer(allocationValueCount * ${type.width});
-    if(newBuf == null) {
+    try{
+      allocateBytes(curAllocationSize);
+    } catch (DrillRuntimeException ex) {
       return false;
     }
-
-    this.data = newBuf;
-    this.data.readerIndex(0);
     return true;
   }
 
   /**
-   * Allocate a new buffer that supports setting at least the provided number of values.  May actually be sized bigger depending on underlying buffer rounding size. Must be called prior to using the ValueVector.
+   * Allocate a new buffer that supports setting at least the provided number of values. May actually be sized bigger
+   * depending on underlying buffer rounding size. Must be called prior to using the ValueVector.
+   *
+   * Note that the maximum number of values a vector can allocate is Integer.MAX_VALUE / value width.
+   *
    * @param valueCount
    * @throws org.apache.drill.exec.memory.OutOfMemoryRuntimeException if it can't allocate the new buffer
    */
-  public void allocateNew(int valueCount) {
-    clear();
+  public void allocateNew(final int valueCount) {
+    allocateBytes(valueCount * ${type.width});
+  }
 
-    DrillBuf newBuf = allocator.buffer(valueCount * ${type.width});
-    if (newBuf == null) {
-      throw new OutOfMemoryRuntimeException(
-        String.format("Failure while allocating buffer of %d bytes",valueCount * ${type.width}));
+  private void allocateBytes(final long size) {
+    if (size > MAX_ALLOCATION_SIZE) {
+      throw new OversizedAllocationException("Requested amount of memory is more than max allowed allocation size");
     }
 
-    this.data = newBuf;
-    this.data.readerIndex(0);
-    this.allocationValueCount = valueCount;
+    final int curSize = (int)size;
+    clear();
+    final DrillBuf newBuf = allocator.buffer(curSize);
+    if (newBuf == null) {
+      throw new OutOfMemoryRuntimeException(String.format("Failure while allocating buffer of %d bytes", size));
+    }
+    data = newBuf;
+    data.readerIndex(0);
+    allocationSizeInBytes = curSize;
   }
 
 /**
@@ -128,12 +140,15 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
  * @throws org.apache.drill.exec.memory.OutOfMemoryRuntimeException if it can't allocate the new buffer
  */
   public void reAlloc() {
-    logger.info("Realloc vector {}. [{}] -> [{}]", field, allocationValueCount * ${type.width}, allocationValueCount * 2 * ${type.width});
-    allocationValueCount *= 2;
-    DrillBuf newBuf = allocator.buffer(allocationValueCount * ${type.width});
+    final long newAllocationSize = allocationSizeInBytes * 2L;
+    if (newAllocationSize > MAX_ALLOCATION_SIZE)  {
+      throw new OversizedAllocationException("Unable to expand the buffer. Max allowed buffer size is reached.");
+    }
+
+    logger.debug("Reallocating vector [{}]. # of bytes: [{}] -> [{}]", field, allocationSizeInBytes, newAllocationSize);
+    final DrillBuf newBuf = allocator.buffer((int)newAllocationSize);
     if (newBuf == null) {
-      throw new OutOfMemoryRuntimeException(
-      String.format("Failure while reallocating buffer to %d bytes",allocationValueCount * ${type.width}));
+      throw new OutOfMemoryRuntimeException(String.format("Failure while reallocating buffer to %d bytes", newAllocationSize));
     }
 
     newBuf.setBytes(0, data, 0, data.capacity());
@@ -141,6 +156,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
     newBuf.writerIndex(data.writerIndex());
     data.release();
     data = newBuf;
+    allocationSizeInBytes = (int)newAllocationSize;
   }
 
   public void zeroVector() {

http://git-wip-us.apache.org/repos/asf/drill/blob/b2bbd994/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
index 7f83542..7fa0d55 100644
--- a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
@@ -131,9 +131,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
      */
     boolean success = false;
     try {
-      if(!values.allocateNewSafe()) return false;
-      if(!bits.allocateNewSafe()) return false;
-      success = true;
+      success = values.allocateNewSafe() && bits.allocateNewSafe();
     } finally {
       if (!success) {
         clear();
@@ -142,7 +140,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     bits.zeroVector();
     mutator.reset();
     accessor.reset();
-    return true;
+    return success;
   }
 
   @Override
@@ -150,7 +148,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     try {
       values.allocateNew(totalBytes, valueCount);
       bits.allocateNew(valueCount);
-    } catch(OutOfMemoryRuntimeException e){
+    } catch(DrillRuntimeException e) {
       clear();
       throw e;
     }
@@ -196,7 +194,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     try {
       values.allocateNew();
       bits.allocateNew();
-    } catch(OutOfMemoryRuntimeException e) {
+    } catch(DrillRuntimeException e) {
       clear();
       throw e;
     }
@@ -215,9 +213,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
      */
     boolean success = false;
     try {
-      if(!values.allocateNewSafe()) return false;
-      if(!bits.allocateNewSafe()) return false;
-      success = true;
+      success = values.allocateNewSafe() && bits.allocateNewSafe();
     } finally {
       if (!success) {
         clear();
@@ -226,7 +222,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     bits.zeroVector();
     mutator.reset();
     accessor.reset();
-    return true;
+    return success;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/b2bbd994/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
index 50ae770..2c2e6b6 100644
--- a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
@@ -69,7 +69,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
   private final UInt${type.width}Vector.Accessor oAccessor;
 
 
-  private int allocationTotalByteCount = INITIAL_BYTE_COUNT;
+  private int allocationSizeInBytes = INITIAL_BYTE_COUNT;
   private int allocationMonitor = 0;
 
   public ${minor.class}Vector(MaterializedField field, BufferAllocator allocator) {
@@ -264,9 +264,13 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
   }
 
   @Override
-  public void setInitialCapacity(int numRecords) {
-    allocationTotalByteCount = numRecords * DEFAULT_RECORD_BYTE_COUNT;
-    offsetVector.setInitialCapacity(numRecords + 1);
+  public void setInitialCapacity(final int valueCount) {
+    final long size = 1L * valueCount * ${type.width};
+    if (size > MAX_ALLOCATION_SIZE) {
+      throw new OversizedAllocationException("Requested amount of memory is more than max allowed allocation size");
+    }
+    allocationSizeInBytes = (int)size;
+    offsetVector.setInitialCapacity(valueCount + 1);
   }
 
   public void allocateNew() {
@@ -277,15 +281,20 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
 
   @Override
   public boolean allocateNewSafe() {
-    clear();
+    long curAllocationSize = allocationSizeInBytes;
     if (allocationMonitor > 10) {
-      allocationTotalByteCount = Math.max(MIN_BYTE_COUNT, (int) (allocationTotalByteCount / 2));
+      curAllocationSize = Math.max(MIN_BYTE_COUNT, curAllocationSize / 2);
       allocationMonitor = 0;
     } else if (allocationMonitor < -2) {
-      allocationTotalByteCount = (int) (allocationTotalByteCount * 2);
+      curAllocationSize = curAllocationSize * 2L;
       allocationMonitor = 0;
     }
 
+    if (curAllocationSize > MAX_ALLOCATION_SIZE) {
+      return false;
+    }
+
+    clear();
     /* Boolean to keep track if all the memory allocations were successful
      * Used in the case of composite vectors when we need to allocate multiple
      * buffers for multiple vectors. If one of the allocations failed we need to
@@ -293,15 +302,13 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
      */
     boolean success = false;
     try {
-      DrillBuf newBuf = allocator.buffer(allocationTotalByteCount);
+      final int requestedSize = (int)curAllocationSize;
+      DrillBuf newBuf = allocator.buffer(requestedSize);
       if (newBuf == null) {
         return false;
       }
       this.data = newBuf;
-      if (!offsetVector.allocateNewSafe()) {
-        return false;
-      }
-      success = true;
+      success = offsetVector.allocateNewSafe();
     } finally {
       if (!success) {
         clear();
@@ -309,40 +316,44 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     }
     data.readerIndex(0);
     offsetVector.zeroVector();
-    return true;
+    return success;
   }
 
   public void allocateNew(int totalBytes, int valueCount) {
     clear();
     assert totalBytes >= 0;
     try {
-      DrillBuf newBuf = allocator.buffer(totalBytes);
+      final DrillBuf newBuf = allocator.buffer(totalBytes);
       if (newBuf == null) {
         throw new OutOfMemoryRuntimeException(String.format("Failure while allocating buffer of %d bytes", totalBytes));
       }
-      this.data = newBuf;
+      data = newBuf;
       offsetVector.allocateNew(valueCount + 1);
-    } catch (OutOfMemoryRuntimeException e) {
+    } catch (DrillRuntimeException e) {
       clear();
       throw e;
     }
     data.readerIndex(0);
-    allocationTotalByteCount = totalBytes;
+    allocationSizeInBytes = totalBytes;
     offsetVector.zeroVector();
   }
 
-    public void reAlloc() {
-      allocationTotalByteCount *= 2;
-      DrillBuf newBuf = allocator.buffer(allocationTotalByteCount);
-      if(newBuf == null){
-        throw new OutOfMemoryRuntimeException(
-          String.format("Failure while reallocating buffer of %d bytes", allocationTotalByteCount));
-      }
+  public void reAlloc() {
+    final long newAllocationSize = allocationSizeInBytes*2L;
+    if (newAllocationSize > MAX_ALLOCATION_SIZE)  {
+      throw new OversizedAllocationException("Unable to expand the buffer. Max allowed buffer size is reached.");
+    }
 
-      newBuf.setBytes(0, data, 0, data.capacity());
-      data.release();
-      data = newBuf;
+    final DrillBuf newBuf = allocator.buffer((int)newAllocationSize);
+    if(newBuf == null) {
+      throw new OutOfMemoryRuntimeException(
+        String.format("Failure while reallocating buffer of %d bytes", newAllocationSize));
     }
+    newBuf.setBytes(0, data, 0, data.capacity());
+    data.release();
+    data = newBuf;
+    allocationSizeInBytes = (int)newAllocationSize;
+  }
 
   public void decrementAllocationMonitor() {
     if (allocationMonitor > 0) {

http://git-wip-us.apache.org/repos/asf/drill/blob/b2bbd994/exec/java-exec/src/main/java/org/apache/drill/exec/exception/OversizedAllocationException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/exception/OversizedAllocationException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/OversizedAllocationException.java
new file mode 100644
index 0000000..f5ae70c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/OversizedAllocationException.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.exception;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.record.RecordBatch;
+
+/**
+ * An exception that is used to signal that allocation request in bytes is greater than the maximum allowed by
+ * {@link org.apache.drill.exec.memory.BufferAllocator#buffer(int) allocator}.
+ *
+ * <p>Operators should handle this exception to split the batch and later resume the execution on the next
+ * {@link RecordBatch#next() iteration}.</p>
+ *
+ */
+public class OversizedAllocationException extends DrillRuntimeException {
+  public OversizedAllocationException() {
+    super();
+  }
+
+  public OversizedAllocationException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+    super(message, cause, enableSuppression, writableStackTrace);
+  }
+
+  public OversizedAllocationException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public OversizedAllocationException(String message) {
+    super(message);
+  }
+
+  public OversizedAllocationException(Throwable cause) {
+    super(cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/b2bbd994/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index b8daceb..491ced3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -150,7 +150,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
     setFlattenVector();
 
     int childCount = incomingRecordCount == 0 ? 0 : flattener.getFlattenField().getAccessor().getInnerValueCount();
-    int outputRecords = flattener.flattenRecords(0, incomingRecordCount, 0);
+    int outputRecords = flattener.flattenRecords(incomingRecordCount, 0);
     // TODO - change this to be based on the repeated vector length
     if (outputRecords < childCount) {
       setValueCount(outputRecords);
@@ -181,7 +181,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
       return;
     }
 
-    int projRecords = flattener.flattenRecords(remainderIndex, remainingRecordCount, 0);
+    int projRecords = flattener.flattenRecords(remainingRecordCount, 0);
     if (projRecords < remainingRecordCount) {
       setValueCount(projRecords);
       this.recordCount = projRecords;

http://git-wip-us.apache.org/repos/asf/drill/blob/b2bbd994/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
index de67b62..a0d82dc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
@@ -21,6 +21,7 @@ import java.util.List;
 
 import javax.inject.Named;
 
+import org.apache.drill.exec.exception.OversizedAllocationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -32,9 +33,11 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
 import com.google.common.collect.ImmutableList;
 
 import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public abstract class FlattenTemplate implements Flattener {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FlattenTemplate.class);
+  private static final Logger logger = LoggerFactory.getLogger(FlattenTemplate.class);
 
   private static final int OUTPUT_BATCH_SIZE = 4*1024;
 
@@ -48,14 +51,11 @@ public abstract class FlattenTemplate implements Flattener {
 
   // this allows for groups to be written between batches if we run out of space, for cases where we have finished
   // a batch on the boundary it will be set to 0
-  private int childIndexWithinCurrGroup;
-  // calculating the current group size requires reading the start and end out of the offset vector, this only happens
-  // once and is stored here for faster reference
-  private int currGroupSize;
-  private int childIndex;
+  private int innerValueIndex;
+  private int currentInnerValueIndex;
 
   public FlattenTemplate() throws SchemaChangeException {
-    childIndexWithinCurrGroup = -1;
+    innerValueIndex = -1;
   }
 
   @Override
@@ -69,8 +69,7 @@ public abstract class FlattenTemplate implements Flattener {
   }
 
   @Override
-  public final int flattenRecords(int startIndex, final int recordCount, int firstOutputIndex) {
-    startIndex = childIndex;
+  public final int flattenRecords(final int recordCount, final int firstOutputIndex) {
     switch (svMode) {
       case FOUR_BYTE:
         throw new UnsupportedOperationException("Flatten does not support selection vector inputs.");
@@ -79,35 +78,47 @@ public abstract class FlattenTemplate implements Flattener {
         throw new UnsupportedOperationException("Flatten does not support selection vector inputs.");
 
       case NONE:
-        if (childIndexWithinCurrGroup == -1) {
-          childIndexWithinCurrGroup = 0;
+        if (innerValueIndex == -1) {
+          innerValueIndex = 0;
         }
+        final int initialInnerValueIndex = currentInnerValueIndex;
+        // restore state to local stack
+        int valueIndexLocal = valueIndex;
+        int innerValueIndexLocal = innerValueIndex;
+        int currentInnerValueIndexLocal = currentInnerValueIndex;
         outer: {
+          int outputIndex = firstOutputIndex;
           final int valueCount = accessor.getValueCount();
-          for ( ; valueIndex < valueCount; valueIndex++) {
-            currGroupSize = accessor.getInnerValueCountAt(valueIndex);
-            for ( ; childIndexWithinCurrGroup < currGroupSize; childIndexWithinCurrGroup++) {
-              if (firstOutputIndex == OUTPUT_BATCH_SIZE) {
+          for ( ; valueIndexLocal < valueCount; valueIndexLocal++) {
+            final int innerValueCount = accessor.getInnerValueCountAt(valueIndexLocal);
+            for ( ; innerValueIndexLocal < innerValueCount; innerValueIndexLocal++) {
+              if (outputIndex == OUTPUT_BATCH_SIZE) {
                 break outer;
               }
-              doEval(valueIndex, firstOutputIndex);
-              firstOutputIndex++;
-              childIndex++;
+              try {
+                doEval(valueIndexLocal, outputIndex);
+              } catch (OversizedAllocationException ex) {
+                // unable to flatten due to a soft buffer overflow. split the batch here and resume execution.
+                logger.debug("Reached allocation limit. Splitting the batch at input index: {} - inner index: {} - current completed index: {}",
+                    valueIndexLocal, innerValueIndexLocal, currentInnerValueIndexLocal) ;
+                break outer;
+              }
+              outputIndex++;
+              currentInnerValueIndexLocal++;
             }
-            childIndexWithinCurrGroup = 0;
+            innerValueIndexLocal = 0;
           }
         }
-//        System.out.println(String.format("startIndex %d, recordCount %d, firstOutputIndex: %d, currGroupSize: %d, childIndexWithinCurrGroup: %d, groupIndex: %d", startIndex, recordCount, firstOutputIndex, currGroupSize, childIndexWithinCurrGroup, groupIndex));
-//        try{
-////          Thread.sleep(1000);
-//        }catch(Exception e){
-//
-//        }
-
+        // save state to heap
+        valueIndex = valueIndexLocal;
+        innerValueIndex = innerValueIndexLocal;
+        currentInnerValueIndex = currentInnerValueIndexLocal;
+        // transfer the computed range
+        final int delta = currentInnerValueIndexLocal - initialInnerValueIndex;
         for (TransferPair t : transfers) {
-          t.splitAndTransfer(startIndex, childIndex - startIndex);
+          t.splitAndTransfer(initialInnerValueIndex, delta);
         }
-        return childIndex - startIndex;
+        return delta;
 
       default:
         throw new UnsupportedOperationException();
@@ -133,8 +144,7 @@ public abstract class FlattenTemplate implements Flattener {
   @Override
   public void resetGroupIndex() {
     this.valueIndex = 0;
-    this.currGroupSize = 0;
-    this.childIndex = 0;
+    this.currentInnerValueIndex = 0;
   }
 
   public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing);

http://git-wip-us.apache.org/repos/asf/drill/blob/b2bbd994/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
index 92cf79d..d691545 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
@@ -29,7 +29,7 @@ import org.apache.drill.exec.vector.complex.RepeatedValueVector;
 public interface Flattener {
 
   public abstract void setup(FragmentContext context, RecordBatch incoming,  RecordBatch outgoing, List<TransferPair> transfers)  throws SchemaChangeException;
-  public abstract int flattenRecords(int startIndex, int recordCount, int firstOutputIndex);
+  public abstract int flattenRecords(int recordCount, int firstOutputIndex);
   public void setFlattenField(RepeatedValueVector repeatedColumn);
   public RepeatedValueVector getFlattenField();
   public void resetGroupIndex();

http://git-wip-us.apache.org/repos/asf/drill/blob/b2bbd994/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
index 0e38f3c..579eed6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
@@ -22,7 +22,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.MaterializedField;
 
 
-public abstract class BaseDataValueVector extends BaseValueVector{
+public abstract class BaseDataValueVector extends BaseValueVector {
 
   protected DrillBuf data;
 

http://git-wip-us.apache.org/repos/asf/drill/blob/b2bbd994/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
index 8129668..cc287c4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
@@ -26,13 +26,17 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.UserBitShared.SerializedField;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.TransferPair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public abstract class BaseValueVector implements ValueVector {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseValueVector.class);
+  private static final Logger logger = LoggerFactory.getLogger(BaseValueVector.class);
+
+  public static final int MAX_ALLOCATION_SIZE = Integer.MAX_VALUE;
+  public static final int INITIAL_VALUE_ALLOCATION = 4096;
 
   protected final BufferAllocator allocator;
   protected final MaterializedField field;
-  public static final int INITIAL_VALUE_ALLOCATION = 4096;
 
   protected BaseValueVector(MaterializedField field, BufferAllocator allocator) {
     this.field = Preconditions.checkNotNull(field, "field cannot be null");

http://git-wip-us.apache.org/repos/asf/drill/blob/b2bbd994/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
index 10bdf07..1d48043 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.vector;
 import io.netty.buffer.DrillBuf;
 
 import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.exec.exception.OversizedAllocationException;
 import org.apache.drill.exec.expr.holders.BitHolder;
 import org.apache.drill.exec.expr.holders.NullableBitHolder;
 import org.apache.drill.exec.memory.BufferAllocator;
@@ -43,7 +44,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
   private final Mutator mutator = new Mutator();
 
   private int valueCount;
-  private int allocationValueCount = INITIAL_VALUE_ALLOCATION;
+  private int allocationSizeInBytes = INITIAL_VALUE_ALLOCATION;
   private int allocationMonitor = 0;
 
   public BitVector(MaterializedField field, BufferAllocator allocator) {
@@ -66,7 +67,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
 
   @Override
   public int getValueCapacity() {
-    return data.capacity() * 8;
+    return (int)Math.min((long)Integer.MAX_VALUE, data.capacity() * 8L);
   }
 
   private int getByteIndex(int index) {
@@ -74,8 +75,8 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
   }
 
   @Override
-  public void setInitialCapacity(int numRecords) {
-    allocationValueCount = numRecords;
+  public void setInitialCapacity(final int valueCount) {
+    allocationSizeInBytes = getSizeFromCount(valueCount);
   }
 
   public void allocateNew() {
@@ -85,24 +86,20 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
   }
 
   public boolean allocateNewSafe() {
-    clear();
+    long curAllocationSize = allocationSizeInBytes;
     if (allocationMonitor > 10) {
-      allocationValueCount = Math.max(8, (int) (allocationValueCount / 2));
+      curAllocationSize = Math.max(8, allocationSizeInBytes / 2);
       allocationMonitor = 0;
     } else if (allocationMonitor < -2) {
-      allocationValueCount = (int) (allocationValueCount * 2);
+      curAllocationSize = allocationSizeInBytes * 2L;
       allocationMonitor = 0;
     }
 
-    clear();
-    int valueSize = getSizeFromCount(allocationValueCount);
-    DrillBuf newBuf = allocator.buffer(valueSize);
-    if (newBuf == null) {
+    try {
+      allocateBytes(curAllocationSize);
+    } catch (OutOfMemoryRuntimeException ex) {
       return false;
     }
-
-    data = newBuf;
-    zeroVector();
     return true;
   }
 
@@ -113,32 +110,46 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
    *          The number of values which can be contained within this vector.
    */
   public void allocateNew(int valueCount) {
+    final int size = getSizeFromCount(valueCount);
+    allocateBytes(size);
+  }
+
+  private void allocateBytes(final long size) {
+    if (size > MAX_ALLOCATION_SIZE) {
+      throw new OversizedAllocationException("Requested amount of memory is more than max allowed allocation size");
+    }
+
+    final int curSize = (int)size;
     clear();
-    int valueSize = getSizeFromCount(valueCount);
-    DrillBuf newBuf = allocator.buffer(valueSize);
+    final DrillBuf newBuf = allocator.buffer(curSize);
     if (newBuf == null) {
-      throw new OutOfMemoryRuntimeException(String.format("Failure while allocating buffer of d% bytes.", valueSize));
+      throw new OutOfMemoryRuntimeException(String.format("Failure while allocating buffer of d% bytes.", curSize));
     }
-
     data = newBuf;
     zeroVector();
+    allocationSizeInBytes = curSize;
   }
 
   /**
    * Allocate new buffer with double capacity, and copy data into the new buffer. Replace vector's buffer with new buffer, and release old one
    */
   public void reAlloc() {
-    allocationValueCount *= 2;
-    int valueSize = getSizeFromCount(allocationValueCount);
-    DrillBuf newBuf = allocator.buffer(valueSize);
+    final long newAllocationSize = allocationSizeInBytes * 2L;
+    if (newAllocationSize > MAX_ALLOCATION_SIZE) {
+      throw new OversizedAllocationException("Requested amount of memory is more than max allowed allocation size");
+    }
+
+    final int curSize = (int)newAllocationSize;
+    final DrillBuf newBuf = allocator.buffer(curSize);
     if (newBuf == null) {
-      throw new OutOfMemoryRuntimeException(String.format("Failure while allocating buffer of %d bytes.", valueSize));
+      throw new OutOfMemoryRuntimeException(String.format("Failure while allocating buffer of %d bytes.", newAllocationSize));
     }
 
     newBuf.setZero(0, newBuf.capacity());
     newBuf.setBytes(0, data, 0, data.capacity());
     data.release();
     data = newBuf;
+    allocationSizeInBytes =  curSize;
   }
 
   /**
@@ -154,7 +165,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
     clear();
     this.valueCount = valueCount;
     int len = getSizeFromCount(valueCount);
-    data = (DrillBuf) buf.slice(0, len);
+    data = buf.slice(0, len);
     data.retain();
     return len;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/b2bbd994/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
index 037c8c6..4eeb8f2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
@@ -23,8 +23,10 @@ import static org.junit.Assert.assertTrue;
 
 import java.nio.charset.Charset;
 
+import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ExecTest;
+import org.apache.drill.exec.exception.OversizedAllocationException;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.expr.holders.BitHolder;
 import org.apache.drill.exec.expr.holders.IntHolder;
@@ -38,15 +40,19 @@ import org.apache.drill.exec.expr.holders.UInt4Holder;
 import org.apache.drill.exec.expr.holders.VarCharHolder;
 import org.apache.drill.exec.memory.TopLevelAllocator;
 import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.vector.BaseValueVector;
 import org.apache.drill.exec.vector.BitVector;
 import org.apache.drill.exec.vector.NullableFloat4Vector;
 import org.apache.drill.exec.vector.NullableUInt4Vector;
 import org.apache.drill.exec.vector.NullableVarCharVector;
 import org.apache.drill.exec.vector.UInt4Vector;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VarCharVector;
 import org.apache.drill.exec.vector.complex.MapVector;
 import org.apache.drill.exec.vector.complex.RepeatedListVector;
 import org.apache.drill.exec.vector.complex.RepeatedMapVector;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 public class TestValueVector extends ExecTest {
@@ -56,29 +62,124 @@ public class TestValueVector extends ExecTest {
   private final static byte[] STR2 = new String("BBBBBBBBB2").getBytes(Charset.forName("UTF-8"));
   private final static byte[] STR3 = new String("CCCC3").getBytes(Charset.forName("UTF-8"));
 
-  TopLevelAllocator allocator = new TopLevelAllocator();
+  private TopLevelAllocator allocator;
+
+  @Before
+  public void init() {
+    allocator = new TopLevelAllocator();
+  }
+
+  @After
+  public void terminate() {
+    allocator.close();
+  }
+
+
+  @Test(expected = OversizedAllocationException.class)
+  public void testFixedVectorReallocation() {
+    final MaterializedField field = MaterializedField.create(EMPTY_SCHEMA_PATH, UInt4Holder.TYPE);
+    final UInt4Vector vector = new UInt4Vector(field, allocator);
+    // edge case 1: buffer size = max value capacity
+    final int expectedValueCapacity = BaseValueVector.MAX_ALLOCATION_SIZE / 4;
+    try {
+      vector.allocateNew(expectedValueCapacity);
+      assertEquals(expectedValueCapacity, vector.getValueCapacity());
+      vector.reAlloc();
+      assertEquals(expectedValueCapacity * 2, vector.getValueCapacity());
+    } finally {
+      vector.close();
+    }
+
+    // common case: value count < max value capacity
+    try {
+      vector.allocateNew(BaseValueVector.MAX_ALLOCATION_SIZE / 8);
+      vector.reAlloc(); // value allocation reaches to MAX_VALUE_ALLOCATION
+      vector.reAlloc(); // this should throw an IOOB
+    } finally {
+      vector.close();
+    }
+  }
+
+  @Test(expected = OversizedAllocationException.class)
+  public void testBitVectorReallocation() {
+    final MaterializedField field = MaterializedField.create(EMPTY_SCHEMA_PATH, UInt4Holder.TYPE);
+    final BitVector vector = new BitVector(field, allocator);
+    // edge case 1: buffer size ~ max value capacity
+    final int expectedValueCapacity = 1 << 29;
+    try {
+      vector.allocateNew(expectedValueCapacity);
+      assertEquals(expectedValueCapacity, vector.getValueCapacity());
+      vector.reAlloc();
+      assertEquals(expectedValueCapacity * 2, vector.getValueCapacity());
+    } finally {
+      vector.close();
+    }
+
+    // common: value count < MAX_VALUE_ALLOCATION
+    try {
+      vector.allocateNew(expectedValueCapacity);
+      for (int i=0; i<3;i++) {
+        vector.reAlloc(); // expand buffer size
+      }
+      assertEquals(Integer.MAX_VALUE, vector.getValueCapacity());
+      vector.reAlloc(); // buffer size ~ max allocation
+      assertEquals(Integer.MAX_VALUE, vector.getValueCapacity());
+      vector.reAlloc(); // overflow
+    } finally {
+      vector.close();
+    }
+  }
+
+
+  @Test(expected = OversizedAllocationException.class)
+  public void testVariableVectorReallocation() {
+    final MaterializedField field = MaterializedField.create(EMPTY_SCHEMA_PATH, UInt4Holder.TYPE);
+    final VarCharVector vector = new VarCharVector(field, allocator);
+    // edge case 1: value count = MAX_VALUE_ALLOCATION
+    final int expectedAllocationInBytes = BaseValueVector.MAX_ALLOCATION_SIZE;
+    final int expectedOffsetSize = 10;
+    try {
+      vector.allocateNew(expectedAllocationInBytes, 10);
+      assertEquals(expectedOffsetSize, vector.getValueCapacity());
+      assertEquals(expectedAllocationInBytes, vector.getBuffer().capacity());
+      vector.reAlloc();
+      assertEquals(expectedOffsetSize * 2, vector.getValueCapacity());
+      assertEquals(expectedAllocationInBytes * 2, vector.getBuffer().capacity());
+    } finally {
+      vector.close();
+    }
+
+    // common: value count < MAX_VALUE_ALLOCATION
+    try {
+      vector.allocateNew(BaseValueVector.MAX_ALLOCATION_SIZE / 2, 0);
+      vector.reAlloc(); // value allocation reaches to MAX_VALUE_ALLOCATION
+      vector.reAlloc(); // this tests if it overflows
+    } finally {
+      vector.close();
+    }
+  }
 
   @Test
   public void testFixedType() {
     MaterializedField field = MaterializedField.create(EMPTY_SCHEMA_PATH, UInt4Holder.TYPE);
 
     // Create a new value vector for 1024 integers
-    UInt4Vector v = new UInt4Vector(field, allocator);
-    UInt4Vector.Mutator m = v.getMutator();
-    v.allocateNew(1024);
-
-    // Put and set a few values
-    m.setSafe(0, 100);
-    m.setSafe(1, 101);
-    m.setSafe(100, 102);
-    m.setSafe(1022, 103);
-    m.setSafe(1023, 104);
-    assertEquals(100, v.getAccessor().get(0));
-    assertEquals(101, v.getAccessor().get(1));
-    assertEquals(102, v.getAccessor().get(100));
-    assertEquals(103, v.getAccessor().get(1022));
-    assertEquals(104, v.getAccessor().get(1023));
-
+    try (UInt4Vector vector = new UInt4Vector(field, allocator)) {
+      UInt4Vector.Mutator m = vector.getMutator();
+      vector.allocateNew(1024);
+
+      // Put and set a few values
+      m.setSafe(0, 100);
+      m.setSafe(1, 101);
+      m.setSafe(100, 102);
+      m.setSafe(1022, 103);
+      m.setSafe(1023, 104);
+      assertEquals(100, vector.getAccessor().get(0));
+      assertEquals(101, vector.getAccessor().get(1));
+      assertEquals(102, vector.getAccessor().get(100));
+      assertEquals(103, vector.getAccessor().get(1022));
+      assertEquals(104, vector.getAccessor().get(1023));
+    }
   }
 
   @Test
@@ -86,29 +187,29 @@ public class TestValueVector extends ExecTest {
     MaterializedField field = MaterializedField.create(EMPTY_SCHEMA_PATH, NullableVarCharHolder.TYPE);
 
     // Create a new value vector for 1024 integers
-    NullableVarCharVector v = new NullableVarCharVector(field, allocator);
-    NullableVarCharVector.Mutator m = v.getMutator();
-    v.allocateNew(1024*10, 1024);
+    try (NullableVarCharVector vector = new NullableVarCharVector(field, allocator)) {
+      NullableVarCharVector.Mutator m = vector.getMutator();
+      vector.allocateNew(1024 * 10, 1024);
 
-    m.set(0, STR1);
-    m.set(1, STR2);
-    m.set(2, STR3);
+      m.set(0, STR1);
+      m.set(1, STR2);
+      m.set(2, STR3);
 
-    // Check the sample strings
-    assertArrayEquals(STR1, v.getAccessor().get(0));
-    assertArrayEquals(STR2, v.getAccessor().get(1));
-    assertArrayEquals(STR3, v.getAccessor().get(2));
+      // Check the sample strings
+      assertArrayEquals(STR1, vector.getAccessor().get(0));
+      assertArrayEquals(STR2, vector.getAccessor().get(1));
+      assertArrayEquals(STR3, vector.getAccessor().get(2));
 
-    // Ensure null value throws
-    boolean b = false;
-    try {
-      v.getAccessor().get(3);
-    } catch(IllegalStateException e) {
-      b = true;
-    }finally{
-      assertTrue(b);
+      // Ensure null value throws
+      boolean b = false;
+      try {
+        vector.getAccessor().get(3);
+      } catch (IllegalStateException e) {
+        b = true;
+      } finally {
+        assertTrue(b);
+      }
     }
-
   }
 
 
@@ -117,68 +218,69 @@ public class TestValueVector extends ExecTest {
     MaterializedField field = MaterializedField.create(EMPTY_SCHEMA_PATH, NullableUInt4Holder.TYPE);
 
     // Create a new value vector for 1024 integers
-    NullableUInt4Vector v = new NullableUInt4Vector(field, allocator);
-    NullableUInt4Vector.Mutator m = v.getMutator();
-    v.allocateNew(1024);
-
-    // Put and set a few values
-    m.set(0, 100);
-    m.set(1, 101);
-    m.set(100, 102);
-    m.set(1022, 103);
-    m.set(1023, 104);
-    assertEquals(100, v.getAccessor().get(0));
-    assertEquals(101, v.getAccessor().get(1));
-    assertEquals(102, v.getAccessor().get(100));
-    assertEquals(103, v.getAccessor().get(1022));
-    assertEquals(104, v.getAccessor().get(1023));
-
-    // Ensure null values throw
-    {
-      boolean b = false;
-      try {
-        v.getAccessor().get(3);
-      } catch(IllegalStateException e) {
-        b = true;
-      }finally{
-        assertTrue(b);
+    try (NullableUInt4Vector vector = new NullableUInt4Vector(field, allocator)) {
+      NullableUInt4Vector.Mutator m = vector.getMutator();
+      vector.allocateNew(1024);
+
+      // Put and set a few values
+      m.set(0, 100);
+      m.set(1, 101);
+      m.set(100, 102);
+      m.set(1022, 103);
+      m.set(1023, 104);
+      assertEquals(100, vector.getAccessor().get(0));
+      assertEquals(101, vector.getAccessor().get(1));
+      assertEquals(102, vector.getAccessor().get(100));
+      assertEquals(103, vector.getAccessor().get(1022));
+      assertEquals(104, vector.getAccessor().get(1023));
+
+      // Ensure null values throw
+      {
+        boolean b = false;
+        try {
+          vector.getAccessor().get(3);
+        } catch (IllegalStateException e) {
+          b = true;
+        } finally {
+          assertTrue(b);
+        }
       }
-    }
 
 
-    v.allocateNew(2048);
-    {
-      boolean b = false;
-      try {
-        v.getAccessor().get(0);
-      } catch(IllegalStateException e) {
-        b = true;
-      }finally{
-        assertTrue(b);
+      vector.allocateNew(2048);
+      {
+        boolean b = false;
+        try {
+          vector.getAccessor().get(0);
+        } catch (IllegalStateException e) {
+          b = true;
+        } finally {
+          assertTrue(b);
+        }
       }
-    }
-
-    m.set(0, 100);
-    m.set(1, 101);
-    m.set(100, 102);
-    m.set(1022, 103);
-    m.set(1023, 104);
-    assertEquals(100, v.getAccessor().get(0));
-    assertEquals(101, v.getAccessor().get(1));
-    assertEquals(102, v.getAccessor().get(100));
-    assertEquals(103, v.getAccessor().get(1022));
-    assertEquals(104, v.getAccessor().get(1023));
 
-    // Ensure null values throw
-
-    {
-      boolean b = false;
-      try {
-        v.getAccessor().get(3);
-      } catch(IllegalStateException e) {
-        b = true;
-      }finally{
-        assertTrue(b);
+      m.set(0, 100);
+      m.set(1, 101);
+      m.set(100, 102);
+      m.set(1022, 103);
+      m.set(1023, 104);
+      assertEquals(100, vector.getAccessor().get(0));
+      assertEquals(101, vector.getAccessor().get(1));
+      assertEquals(102, vector.getAccessor().get(100));
+      assertEquals(103, vector.getAccessor().get(1022));
+      assertEquals(104, vector.getAccessor().get(1023));
+
+      // Ensure null values throw
+
+      {
+        boolean b = false;
+        try {
+          vector.getAccessor().get(3);
+        } catch (IllegalStateException e) {
+          b = true;
+        } finally {
+          assertTrue(b);
+        }
       }
     }
 
@@ -189,43 +291,44 @@ public class TestValueVector extends ExecTest {
     MaterializedField field = MaterializedField.create(EMPTY_SCHEMA_PATH, NullableFloat4Holder.TYPE);
 
     // Create a new value vector for 1024 integers
-    NullableFloat4Vector v = (NullableFloat4Vector) TypeHelper.getNewVector(field, allocator);
-    NullableFloat4Vector.Mutator m = v.getMutator();
-    v.allocateNew(1024);
-
-    // Put and set a few values
-    m.set(0, 100.1f);
-    m.set(1, 101.2f);
-    m.set(100, 102.3f);
-    m.set(1022, 103.4f);
-    m.set(1023, 104.5f);
-    assertEquals(100.1f, v.getAccessor().get(0), 0);
-    assertEquals(101.2f, v.getAccessor().get(1), 0);
-    assertEquals(102.3f, v.getAccessor().get(100), 0);
-    assertEquals(103.4f, v.getAccessor().get(1022), 0);
-    assertEquals(104.5f, v.getAccessor().get(1023), 0);
-
-    // Ensure null values throw
-    {
-      boolean b = false;
-      try {
-        v.getAccessor().get(3);
-      } catch(IllegalStateException e) {
-        b = true;
-      }finally{
-        assertTrue(b);
+    try (NullableFloat4Vector vector = (NullableFloat4Vector) TypeHelper.getNewVector(field, allocator)) {
+      NullableFloat4Vector.Mutator m = vector.getMutator();
+      vector.allocateNew(1024);
+
+      // Put and set a few values
+      m.set(0, 100.1f);
+      m.set(1, 101.2f);
+      m.set(100, 102.3f);
+      m.set(1022, 103.4f);
+      m.set(1023, 104.5f);
+      assertEquals(100.1f, vector.getAccessor().get(0), 0);
+      assertEquals(101.2f, vector.getAccessor().get(1), 0);
+      assertEquals(102.3f, vector.getAccessor().get(100), 0);
+      assertEquals(103.4f, vector.getAccessor().get(1022), 0);
+      assertEquals(104.5f, vector.getAccessor().get(1023), 0);
+
+      // Ensure null values throw
+      {
+        boolean b = false;
+        try {
+          vector.getAccessor().get(3);
+        } catch (IllegalStateException e) {
+          b = true;
+        } finally {
+          assertTrue(b);
+        }
       }
-    }
 
-    v.allocateNew(2048);
-    {
-      boolean b = false;
-      try {
-        v.getAccessor().get(0);
-      } catch(IllegalStateException e) {
-        b = true;
-      }finally{
-        assertTrue(b);
+      vector.allocateNew(2048);
+      {
+        boolean b = false;
+        try {
+          vector.getAccessor().get(0);
+        } catch (IllegalStateException e) {
+          b = true;
+        } finally {
+          assertTrue(b);
+        }
       }
     }
   }
@@ -235,36 +338,37 @@ public class TestValueVector extends ExecTest {
     MaterializedField field = MaterializedField.create(EMPTY_SCHEMA_PATH, BitHolder.TYPE);
 
     // Create a new value vector for 1024 integers
-    BitVector v = new BitVector(field, allocator);
-    BitVector.Mutator m = v.getMutator();
-    v.allocateNew(1024);
-
-    // Put and set a few values
-    m.set(0, 1);
-    m.set(1, 0);
-    m.set(100, 0);
-    m.set(1022, 1);
-    assertEquals(1, v.getAccessor().get(0));
-    assertEquals(0, v.getAccessor().get(1));
-    assertEquals(0, v.getAccessor().get(100));
-    assertEquals(1, v.getAccessor().get(1022));
-
-    // test setting the same value twice
-    m.set(0, 1);
-    m.set(0, 1);
-    m.set(1, 0);
-    m.set(1, 0);
-    assertEquals(1, v.getAccessor().get(0));
-    assertEquals(0, v.getAccessor().get(1));
-
-    // test toggling the values
-    m.set(0, 0);
-    m.set(1, 1);
-    assertEquals(0, v.getAccessor().get(0));
-    assertEquals(1, v.getAccessor().get(1));
-
-    // Ensure unallocated space returns 0
-    assertEquals(0, v.getAccessor().get(3));
+    try (BitVector vector = new BitVector(field, allocator)) {
+      BitVector.Mutator m = vector.getMutator();
+      vector.allocateNew(1024);
+
+      // Put and set a few values
+      m.set(0, 1);
+      m.set(1, 0);
+      m.set(100, 0);
+      m.set(1022, 1);
+      assertEquals(1, vector.getAccessor().get(0));
+      assertEquals(0, vector.getAccessor().get(1));
+      assertEquals(0, vector.getAccessor().get(100));
+      assertEquals(1, vector.getAccessor().get(1022));
+
+      // test setting the same value twice
+      m.set(0, 1);
+      m.set(0, 1);
+      m.set(1, 0);
+      m.set(1, 0);
+      assertEquals(1, vector.getAccessor().get(0));
+      assertEquals(0, vector.getAccessor().get(1));
+
+      // test toggling the values
+      m.set(0, 0);
+      m.set(1, 1);
+      assertEquals(0, vector.getAccessor().get(0));
+      assertEquals(1, vector.getAccessor().get(1));
+
+      // Ensure unallocated space returns 0
+      assertEquals(0, vector.getAccessor().get(3));
+    }
   }
 
 
@@ -273,33 +377,34 @@ public class TestValueVector extends ExecTest {
     MaterializedField field = MaterializedField.create(EMPTY_SCHEMA_PATH, NullableFloat4Holder.TYPE);
 
     // Create a new value vector for 1024 integers
-    NullableFloat4Vector v = (NullableFloat4Vector) TypeHelper.getNewVector(field, allocator);
-    NullableFloat4Vector.Mutator m = v.getMutator();
-    v.allocateNew(1024);
+    try (NullableFloat4Vector vector = (NullableFloat4Vector) TypeHelper.getNewVector(field, allocator)) {
+      NullableFloat4Vector.Mutator m = vector.getMutator();
+      vector.allocateNew(1024);
 
-    assertEquals(1024, v.getValueCapacity());
+      assertEquals(1024, vector.getValueCapacity());
 
-    // Put values in indexes that fall within the initial allocation
-    m.setSafe(0, 100.1f);
-    m.setSafe(100, 102.3f);
-    m.setSafe(1023, 104.5f);
+      // Put values in indexes that fall within the initial allocation
+      m.setSafe(0, 100.1f);
+      m.setSafe(100, 102.3f);
+      m.setSafe(1023, 104.5f);
 
-    // Now try to put values in space that falls beyond the initial allocation
-    m.setSafe(2000, 105.5f);
+      // Now try to put values in space that falls beyond the initial allocation
+      m.setSafe(2000, 105.5f);
 
-    // Check valueCapacity is more than initial allocation
-    assertEquals(1024*2, v.getValueCapacity());
+      // Check valueCapacity is more than initial allocation
+      assertEquals(1024 * 2, vector.getValueCapacity());
 
-    assertEquals(100.1f, v.getAccessor().get(0), 0);
-    assertEquals(102.3f, v.getAccessor().get(100), 0);
-    assertEquals(104.5f, v.getAccessor().get(1023), 0);
-    assertEquals(105.5f, v.getAccessor().get(2000), 0);
+      assertEquals(100.1f, vector.getAccessor().get(0), 0);
+      assertEquals(102.3f, vector.getAccessor().get(100), 0);
+      assertEquals(104.5f, vector.getAccessor().get(1023), 0);
+      assertEquals(105.5f, vector.getAccessor().get(2000), 0);
 
 
-    // Set the valueCount to be more than valueCapacity of current allocation. This is possible for NullableValueVectors
-    // as we don't call setSafe for null values, but we do call setValueCount when all values are inserted into the
-    // vector
-    m.setValueCount(v.getValueCapacity() + 200);
+      // Set the valueCount to be more than valueCapacity of current allocation. This is possible for NullableValueVectors
+      // as we don't call setSafe for null values, but we do call setValueCount when all values are inserted into the
+      // vector
+      m.setValueCount(vector.getValueCapacity() + 200);
+    }
   }
 
   @Test
@@ -307,33 +412,34 @@ public class TestValueVector extends ExecTest {
     MaterializedField field = MaterializedField.create(EMPTY_SCHEMA_PATH, NullableVarCharHolder.TYPE);
 
     // Create a new value vector for 1024 integers
-    NullableVarCharVector v = (NullableVarCharVector) TypeHelper.getNewVector(field, allocator);
-    NullableVarCharVector.Mutator m = v.getMutator();
-    v.allocateNew();
+    try (NullableVarCharVector vector = (NullableVarCharVector) TypeHelper.getNewVector(field, allocator)) {
+      NullableVarCharVector.Mutator m = vector.getMutator();
+      vector.allocateNew();
 
-    int initialCapacity = v.getValueCapacity();
+      int initialCapacity = vector.getValueCapacity();
 
-    // Put values in indexes that fall within the initial allocation
-    m.setSafe(0, STR1, 0, STR1.length);
-    m.setSafe(initialCapacity - 1, STR2, 0, STR2.length);
+      // Put values in indexes that fall within the initial allocation
+      m.setSafe(0, STR1, 0, STR1.length);
+      m.setSafe(initialCapacity - 1, STR2, 0, STR2.length);
 
-    // Now try to put values in space that falls beyond the initial allocation
-    m.setSafe(initialCapacity + 200, STR3, 0, STR3.length);
+      // Now try to put values in space that falls beyond the initial allocation
+      m.setSafe(initialCapacity + 200, STR3, 0, STR3.length);
 
-    // Check valueCapacity is more than initial allocation
-    assertEquals((initialCapacity+1)*2-1, v.getValueCapacity());
+      // Check valueCapacity is more than initial allocation
+      assertEquals((initialCapacity + 1) * 2 - 1, vector.getValueCapacity());
 
-    assertArrayEquals(STR1, v.getAccessor().get(0));
-    assertArrayEquals(STR2, v.getAccessor().get(initialCapacity-1));
-    assertArrayEquals(STR3, v.getAccessor().get(initialCapacity + 200));
+      assertArrayEquals(STR1, vector.getAccessor().get(0));
+      assertArrayEquals(STR2, vector.getAccessor().get(initialCapacity - 1));
+      assertArrayEquals(STR3, vector.getAccessor().get(initialCapacity + 200));
 
-    // Set the valueCount to be more than valueCapacity of current allocation. This is possible for NullableValueVectors
-    // as we don't call setSafe for null values, but we do call setValueCount when the current batch is processed.
-    m.setValueCount(v.getValueCapacity() + 200);
+      // Set the valueCount to be more than valueCapacity of current allocation. This is possible for NullableValueVectors
+      // as we don't call setSafe for null values, but we do call setValueCount when the current batch is processed.
+      m.setValueCount(vector.getValueCapacity() + 200);
+    }
   }
 
   @Test
-  public void testVVInitialCapacity() {
+  public void testVVInitialCapacity() throws Exception {
     final MaterializedField[] fields = new MaterializedField[9];
     final ValueVector[] valueVectors = new ValueVector[9];
 
@@ -357,17 +463,21 @@ public class TestValueVector extends ExecTest {
 
     final int initialCapacity = 1024;
 
-    for(int i=0; i<valueVectors.length; i++) {
-      valueVectors[i] = TypeHelper.getNewVector(fields[i], allocator);
-      valueVectors[i].setInitialCapacity(initialCapacity);
-      valueVectors[i].allocateNew();
-    }
+    try {
+      for (int i = 0; i < valueVectors.length; i++) {
+        valueVectors[i] = TypeHelper.getNewVector(fields[i], allocator);
+        valueVectors[i].setInitialCapacity(initialCapacity);
+        valueVectors[i].allocateNew();
+      }
 
-    for(int i=0; i<valueVectors.length; i++) {
-      final ValueVector vv = valueVectors[i];
-      final int vvCapacity = vv.getValueCapacity();
-      assertEquals(String.format("Incorrect value capacity for %s [%d]", vv.getField(), vvCapacity),
-          initialCapacity, vvCapacity);
+      for (int i = 0; i < valueVectors.length; i++) {
+        final ValueVector vv = valueVectors[i];
+        final int vvCapacity = vv.getValueCapacity();
+        assertEquals(String.format("Incorrect value capacity for %s [%d]", vv.getField(), vvCapacity),
+            initialCapacity, vvCapacity);
+      }
+    } finally {
+      AutoCloseables.close(valueVectors);
     }
   }
 }


Mime
View raw message