drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ve...@apache.org
Subject drill git commit: DRILL-2178: Update outgoing record batch size and allocation in PartitionSender.
Date Sun, 01 Mar 2015 19:21:58 GMT
Repository: drill
Updated Branches:
  refs/heads/master c8a241b06 -> 9c0738d94


DRILL-2178: Update outgoing record batch size and allocation in PartitionSender.

Also:
 + Add setInitialCapacity() method to ValueVector interface set the initial capacity
   of memory allocated in first allocateNew() call.
 + Send an empty batch for fast schema instead of flushing the OutgoingRecordBatches
   which throw away allocated memory and reallocate again.
 + Remove the v.getValueVector().makeTransferPair(outgoingVector) hack as the complex
   schema child schema population bug is fixed in DRILL-1885.
 + Cleanup/refactor PartitionSender related code.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/9c0738d9
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/9c0738d9
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/9c0738d9

Branch: refs/heads/master
Commit: 9c0738d94f246443554a1b6d03c7a8d4d21d29a8
Parents: c8a241b
Author: vkorukanti <venki.korukanti@gmail.com>
Authored: Fri Feb 6 13:20:07 2015 -0800
Committer: vkorukanti <venki.korukanti@gmail.com>
Committed: Sun Mar 1 10:30:39 2015 -0800

----------------------------------------------------------------------
 .../codegen/templates/FixedValueVectors.java    |   4 +
 .../codegen/templates/NullableValueVectors.java |   6 +
 .../codegen/templates/RepeatedValueVectors.java |   6 +
 .../templates/VariableLengthVectors.java        |   9 +-
 .../impl/materialize/QueryWritableBatch.java    |  16 --
 .../PartitionSenderRootExec.java                |  46 ++---
 .../partitionsender/PartitionerTemplate.java    | 156 ++++++++---------
 .../exec/record/FragmentWritableBatch.java      |  18 +-
 .../drill/exec/record/MaterializedField.java    |  13 +-
 .../org/apache/drill/exec/vector/BitVector.java |   5 +
 .../apache/drill/exec/vector/ObjectVector.java  |   6 +
 .../drill/exec/vector/RepeatedVector.java       |   3 +-
 .../apache/drill/exec/vector/ValueVector.java   |   6 +
 .../drill/exec/vector/complex/MapVector.java    |   7 +
 .../exec/vector/complex/RepeatedListVector.java |   7 +
 .../exec/vector/complex/RepeatedMapVector.java  |   7 +
 .../exec/record/vector/TestValueVector.java     | 175 +++++++++----------
 17 files changed, 253 insertions(+), 237 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/9c0738d9/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
index 52a3868..b5011e6 100644
--- a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
@@ -63,6 +63,10 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements
F
     return mutator;
   }
 
+  @Override
+  public void setInitialCapacity(int numRecords) {
+    allocationValueCount = numRecords;
+  }
 
   public void allocateNew() {
     if(!allocateNewSafe()){

http://git-wip-us.apache.org/repos/asf/drill/blob/9c0738d9/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
index ba7c629..1618a99 100644
--- a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
@@ -100,6 +100,12 @@ public final class ${className} extends BaseValueVector implements <#if
type.maj
     return values;
   }
 
+  @Override
+  public void setInitialCapacity(int numRecords) {
+    bits.setInitialCapacity(numRecords);
+    values.setInitialCapacity(numRecords);
+  }
+
   <#if type.major == "VarLen">
   @Override
   public SerializedField getMetadata() {

http://git-wip-us.apache.org/repos/asf/drill/blob/9c0738d9/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
index d39040e..ee40cc2 100644
--- a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
@@ -174,6 +174,12 @@ public final class Repeated${minor.class}Vector extends BaseValueVector
implemen
       }
     }
 
+  @Override
+  public void setInitialCapacity(int numRecords) {
+    offsets.setInitialCapacity(numRecords + 1);
+    values.setInitialCapacity(numRecords * DEFAULT_REPEAT_PER_RECORD);
+  }
+
   public boolean allocateNewSafe(){
     if(!offsets.allocateNewSafe()) return false;
     offsets.zeroVector();

http://git-wip-us.apache.org/repos/asf/drill/blob/9c0738d9/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
index f854d32..93557e2 100644
--- a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
@@ -50,7 +50,8 @@ package org.apache.drill.exec.vector;
 public final class ${minor.class}Vector extends BaseDataValueVector implements VariableWidthVector{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(${minor.class}Vector.class);
 
-  private static final int INITIAL_BYTE_COUNT = 32768;
+  private static final int DEFAULT_RECORD_BYTE_COUNT = 8;
+  private static final int INITIAL_BYTE_COUNT = 4096 * DEFAULT_RECORD_BYTE_COUNT;
   private static final int MIN_BYTE_COUNT = 4096;
   
   private final UInt${type.width}Vector offsetVector;
@@ -241,6 +242,12 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements
V
     }
   }
 
+  @Override
+  public void setInitialCapacity(int numRecords) {
+    allocationTotalByteCount = numRecords * DEFAULT_RECORD_BYTE_COUNT;
+    offsetVector.setInitialCapacity(numRecords + 1);
+  }
+
   public void allocateNew() {
     if(!allocateNewSafe()){
       throw new OutOfMemoryRuntimeException("Failure while allocating buffer.");

http://git-wip-us.apache.org/repos/asf/drill/blob/9c0738d9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
index cef4101..e6c3fba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
@@ -64,20 +64,4 @@ public class QueryWritableBatch {
   public String toString() {
     return "QueryWritableBatch [header=" + header + ", buffers=" + Arrays.toString(buffers)
+ "]";
   }
-
-  public static QueryWritableBatch getEmptyBatchWithSchema(QueryId queryId, int rowCount,
boolean isLastChunk, BatchSchema schema) {
-    List<SerializedField> fields = Lists.newArrayList();
-    for (MaterializedField field : schema) {
-      fields.add(field.getAsBuilder().build());
-    }
-    RecordBatchDef def = RecordBatchDef.newBuilder().addAllField(fields).build();
-    QueryResult header = QueryResult.newBuilder() //
-            .setQueryId(queryId) //
-            .setRowCount(rowCount) //
-            .setDef(def) //
-            .setIsLastChunk(isLastChunk) //
-            .build();
-    return new QueryWritableBatch(header);
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/9c0738d9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index f09acaa..a23bd7a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -90,7 +90,6 @@ public class PartitionSenderRootExec extends BaseRootExec {
                                  RecordBatch incoming,
                                  HashPartitionSender operator) throws OutOfMemoryException
{
     super(context, new OperatorContext(operator, context, null, false), operator);
-    //super(context, operator);
     this.incoming = incoming;
     this.operator = operator;
     this.context = context;
@@ -101,24 +100,6 @@ public class PartitionSenderRootExec extends BaseRootExec {
     this.remaingReceiverCount = new AtomicInteger(outGoingBatchCount);
   }
 
-  private boolean done() {
-    for (int i = 0; i < remainingReceivers.length(); i++) {
-      if (remainingReceivers.get(i) == 0) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  private void buildSchema() throws SchemaChangeException {
-    createPartitioner();
-    try {
-      partitioner.flushOutgoingBatches(false, true);
-    } catch (IOException e) {
-      throw new SchemaChangeException(e);
-    }
-  }
-
   @Override
   public boolean innerNext() {
 
@@ -147,7 +128,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
           if(partitioner != null) {
             partitioner.flushOutgoingBatches(true, false);
           } else {
-            sendEmptyBatch();
+            sendEmptyBatch(true);
           }
         } catch (IOException e) {
           incoming.kill(false);
@@ -170,10 +151,11 @@ public class PartitionSenderRootExec extends BaseRootExec {
             partitioner.clear();
           }
           createPartitioner();
-          // flush to send schema downstream
+
           if (first) {
+            // Send an empty batch for fast schema
             first = false;
-            partitioner.flushOutgoingBatches(false, true);
+            sendEmptyBatch(false);
           }
         } catch (IOException e) {
           incoming.kill(false);
@@ -233,7 +215,6 @@ public class PartitionSenderRootExec extends BaseRootExec {
 
     try {
       // compile and setup generated code
-//      partitioner = context.getImplementationClassMultipleOutput(cg);
       partitioner = context.getImplementationClass(cg);
       partitioner.setup(context, incoming, popConfig, stats, sendCount, oContext, statusHandler);
 
@@ -285,27 +266,28 @@ public class PartitionSenderRootExec extends BaseRootExec {
     incoming.cleanup();
   }
 
-  public void sendEmptyBatch() {
+  public void sendEmptyBatch(boolean isLast) {
     FragmentHandle handle = context.getHandle();
     int fieldId = 0;
     StatusHandler statusHandler = new StatusHandler(sendCount, context);
     for (DrillbitEndpoint endpoint : popConfig.getDestinations()) {
       FragmentHandle opposite = context.getHandle().toBuilder().setMajorFragmentId(popConfig.getOppositeMajorFragmentId()).setMinorFragmentId(fieldId).build();
       DataTunnel tunnel = context.getDataTunnel(endpoint, opposite);
-      FragmentWritableBatch writableBatch = FragmentWritableBatch.getEmptyLastWithSchema(
-              handle.getQueryId(),
-              handle.getMajorFragmentId(),
-              handle.getMinorFragmentId(),
-              operator.getOppositeMajorFragmentId(),
-              fieldId,
-              incoming.getSchema());
+      FragmentWritableBatch writableBatch = FragmentWritableBatch.getEmptyBatchWithSchema(
+          isLast,
+          handle.getQueryId(),
+          handle.getMajorFragmentId(),
+          handle.getMinorFragmentId(),
+          operator.getOppositeMajorFragmentId(),
+          fieldId,
+          incoming.getSchema());
       stats.startWait();
       try {
         tunnel.sendRecordBatch(statusHandler, writableBatch);
       } finally {
         stats.stopWait();
       }
-      this.sendCount.increment();
+      sendCount.increment();
       fieldId++;
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/9c0738d9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index 4292c09..71ffd41 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -41,7 +41,6 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.FragmentWritableBatch;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.SchemaBuilder;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
@@ -52,19 +51,20 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.rpc.data.DataTunnel;
 import org.apache.drill.exec.vector.ValueVector;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 public abstract class PartitionerTemplate implements Partitioner {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionerTemplate.class);
 
+  // Always keep the recordCount as (2^x) - 1 to better utilize the memory allocation in
ValueVectors
+  private static final int DEFAULT_RECORD_BATCH_SIZE = (1 << 10) - 1;
+
   private SelectionVector2 sv2;
   private SelectionVector4 sv4;
   private RecordBatch incoming;
   private List<OutgoingRecordBatch> outgoingBatches = Lists.newArrayList();
 
-  private static final String REWRITE_MSG = "Failed to write the record {} in available space.
Attempting to rewrite.";
-  private static final String RECORD_TOO_BIG_MSG = "Record {} is too big to fit into the
allocated memory of ValueVector.";
+  private int outgoingRecordBatchSize = DEFAULT_RECORD_BATCH_SIZE;
 
   public PartitionerTemplate() throws SchemaChangeException {
   }
@@ -86,6 +86,13 @@ public abstract class PartitionerTemplate implements Partitioner {
     this.incoming = incoming;
     doSetup(context, incoming, null);
 
+    // Half the outgoing record batch size if the number of senders exceeds 1000 to reduce
the total amount of memory
+    // allocated.
+    if (popConfig.getDestinations().size() > 1000) {
+      // Always keep the recordCount as (2^x) - 1 to better utilize the memory allocation
in ValueVectors
+      outgoingRecordBatchSize = (DEFAULT_RECORD_BATCH_SIZE + 1)/2 - 1;
+    }
+
     int fieldId = 0;
     for (DrillbitEndpoint endpoint : popConfig.getDestinations()) {
       FragmentHandle opposite = context.getHandle().toBuilder().setMajorFragmentId(popConfig.getOppositeMajorFragmentId()).setMinorFragmentId(fieldId).build();
@@ -131,7 +138,7 @@ public abstract class PartitionerTemplate implements Partitioner {
       if (isLastBatch) {
         batch.setIsLast();
       }
-      batch.flush();
+      batch.flush(schemaChanged);
       if (schemaChanged) {
         batch.resetBatch();
         batch.initializeBatch();
@@ -202,18 +209,14 @@ public abstract class PartitionerTemplate implements Partitioner {
     private final VectorContainer vectorContainer = new VectorContainer();
     private final SendingAccountor sendCount;
     private final int oppositeMinorFragmentId;
+    private final StatusHandler statusHandler;
+    private final OperatorStats stats;
 
     private boolean isLast = false;
-    private boolean isFirst = true;
     private volatile boolean terminated = false;
     private boolean dropAll = false;
-    private BatchSchema outSchema;
     private int recordCount;
     private int totalRecords;
-    private OperatorStats stats;
-    private static final int DEFAULT_RECORD_BATCH_SIZE = 1000;
-
-    private final StatusHandler statusHandler;
 
     public OutgoingRecordBatch(OperatorStats stats, SendingAccountor sendCount, HashPartitionSender
operator, DataTunnel tunnel,
                                FragmentContext context, BufferAllocator allocator, int oppositeMinorFragmentId,
@@ -232,8 +235,8 @@ public abstract class PartitionerTemplate implements Partitioner {
       doEval(inIndex, recordCount);
       recordCount++;
       totalRecords++;
-      if (recordCount == DEFAULT_RECORD_BATCH_SIZE) {
-        flush();
+      if (recordCount == outgoingRecordBatchSize) {
+        flush(false);
       }
     }
 
@@ -248,109 +251,97 @@ public abstract class PartitionerTemplate implements Partitioner {
     @RuntimeOverridden
     protected void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex)
{ };
 
-    public void flush() throws IOException {
+    public void flush(boolean schemaChanged) throws IOException {
       if (dropAll) {
         vectorContainer.zeroVectors();
         return;
       }
       final FragmentHandle handle = context.getHandle();
 
-      if (recordCount != 0 && !terminated) {
+      // We need to send the last batch when
+      //   1. we are actually done processing the incoming RecordBatches and no more input
available
+      //   2. receiver wants to terminate (possible in case of queries involving limit clause)
+      final boolean isLastBatch = isLast || terminated;
+
+      // if the batch is not the last batch and the current recordCount is zero, then no
need to send any RecordBatches
+      if (!isLastBatch && recordCount == 0) {
+        return;
+      }
 
-        for(VectorWrapper<?> w : vectorContainer){
+      if (recordCount != 0) {
+        for (VectorWrapper<?> w : vectorContainer) {
           w.getValueVector().getMutator().setValueCount(recordCount);
         }
+      }
 
-        FragmentWritableBatch writableBatch = new FragmentWritableBatch(isLast,
-                handle.getQueryId(),
-                handle.getMajorFragmentId(),
-                handle.getMinorFragmentId(),
-                operator.getOppositeMajorFragmentId(),
-                oppositeMinorFragmentId,
-                getWritableBatch());
-
-        updateStats(writableBatch);
-        stats.startWait();
-        try {
-          tunnel.sendRecordBatch(statusHandler, writableBatch);
-        } finally {
-          stats.stopWait();
-        }
-        this.sendCount.increment();
-      } else {
-        logger.debug("Flush requested on an empty outgoing record batch" + (isLast ? " (last
batch)" : ""));
-        if (isFirst || isLast || terminated) {
-          // send final (empty) batch
-          FragmentWritableBatch writableBatch = new FragmentWritableBatch(isLast || terminated,
-                  handle.getQueryId(),
-                  handle.getMajorFragmentId(),
-                  handle.getMinorFragmentId(),
-                  operator.getOppositeMajorFragmentId(),
-                  oppositeMinorFragmentId,
-                  getWritableBatch());
-          stats.startWait();
-          try {
-            tunnel.sendRecordBatch(statusHandler, writableBatch);
-          } finally {
-            stats.stopWait();
-          }
-          this.sendCount.increment();
-          vectorContainer.zeroVectors();
-          if (!isFirst) {
-            dropAll = true;
-          }
-          if (isFirst) {
-            isFirst = !isFirst;
-          }
-          return;
-        }
+      FragmentWritableBatch writableBatch = new FragmentWritableBatch(isLastBatch,
+          handle.getQueryId(),
+          handle.getMajorFragmentId(),
+          handle.getMinorFragmentId(),
+          operator.getOppositeMajorFragmentId(),
+          oppositeMinorFragmentId,
+          getWritableBatch());
+
+      updateStats(writableBatch);
+      stats.startWait();
+      try {
+        tunnel.sendRecordBatch(statusHandler, writableBatch);
+      } finally {
+        stats.stopWait();
       }
+      sendCount.increment();
 
-      // reset values and reallocate the buffer for each value vector based on the incoming
batch.
-      // NOTE: the value vector is directly referenced by generated code; therefore references
-      // must remain valid.
-      recordCount = 0;
-      vectorContainer.zeroVectors();
-      for (VectorWrapper<?> v : vectorContainer) {
-        v.getValueVector().allocateNew();
+      // If the current batch is the last batch, then set a flag to ignore any requests to
flush the data
+      // This is possible when the receiver is terminated, but we still get data from input
operator
+      if (isLastBatch) {
+        dropAll = true;
       }
+
+      // If this flush is not due to schema change, allocate space for existing vectors.
+      if (!schemaChanged) {
+        // reset values and reallocate the buffer for each value vector based on the incoming
batch.
+        // NOTE: the value vector is directly referenced by generated code; therefore references
+        // must remain valid.
+        recordCount = 0;
+        vectorContainer.zeroVectors();
+        allocateOutgoingRecordBatch();
+      }
+
       if (!statusHandler.isOk()) {
         throw new IOException(statusHandler.getException());
       }
     }
 
+    private void allocateOutgoingRecordBatch() {
+      for (VectorWrapper<?> v : vectorContainer) {
+        v.getValueVector().allocateNew();
+      }
+    }
+
     public void updateStats(FragmentWritableBatch writableBatch) {
       stats.addLongStat(Metric.BYTES_SENT, writableBatch.getByteCount());
       stats.addLongStat(Metric.BATCHES_SENT, 1);
       stats.addLongStat(Metric.RECORDS_SENT, writableBatch.getHeader().getDef().getRecordCount());
     }
 
+    /**
+     * Initialize the OutgoingBatch based on the current schema in incoming RecordBatch
+     */
     public void initializeBatch() {
-      isLast = false;
-      vectorContainer.clear();
-
-      SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
       for (VectorWrapper<?> v : incoming) {
-
-        // add field to the output schema
-        bldr.addField(v.getField());
-
-        // allocate a new value vector
+        // create new vector
         ValueVector outgoingVector = TypeHelper.getNewVector(v.getField(), allocator);
-        v.getValueVector().makeTransferPair(outgoingVector);
-        outgoingVector.allocateNew();
+        outgoingVector.setInitialCapacity(outgoingRecordBatchSize);
         vectorContainer.add(outgoingVector);
       }
-      outSchema = bldr.build();
+      allocateOutgoingRecordBatch();
       doSetup(incoming, vectorContainer);
     }
 
     public void resetBatch() {
       isLast = false;
       recordCount = 0;
-      for (VectorWrapper<?> v : vectorContainer){
-        v.getValueVector().clear();
-      }
+      vectorContainer.clear();
     }
 
     public void setIsLast() {
@@ -359,8 +350,7 @@ public abstract class PartitionerTemplate implements Partitioner {
 
     @Override
     public BatchSchema getSchema() {
-      Preconditions.checkNotNull(outSchema);
-      return outSchema;
+      return incoming.getSchema();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/9c0738d9/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
index d122311..3d06806 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
@@ -66,24 +66,20 @@ public class FragmentWritableBatch{
 
   public static FragmentWritableBatch getEmptyLastWithSchema(QueryId queryId, int sendMajorFragmentId,
int sendMinorFragmentId,
                                                              int receiveMajorFragmentId,
int receiveMinorFragmentId, BatchSchema schema){
-
-    List<SerializedField> fields = Lists.newArrayList();
-    for (MaterializedField field : schema) {
-      fields.add(field.getAsBuilder().build());
-    }
-    RecordBatchDef def = RecordBatchDef.newBuilder().addAllField(fields).build();
-    return new FragmentWritableBatch(true, queryId, sendMajorFragmentId, sendMinorFragmentId,
receiveMajorFragmentId, receiveMinorFragmentId, def);
+    return getEmptyBatchWithSchema(true, queryId, sendMajorFragmentId, sendMinorFragmentId,
receiveMajorFragmentId,
+        receiveMinorFragmentId, schema);
   }
 
-  public static FragmentWritableBatch getEmptyBatchWithSchema(QueryId queryId, int sendMajorFragmentId,
int sendMinorFragmentId,
-                                                             int receiveMajorFragmentId,
int receiveMinorFragmentId, BatchSchema schema){
+  public static FragmentWritableBatch getEmptyBatchWithSchema(boolean isLast, QueryId queryId,
int sendMajorFragmentId,
+      int sendMinorFragmentId, int receiveMajorFragmentId, int receiveMinorFragmentId, BatchSchema
schema){
 
     List<SerializedField> fields = Lists.newArrayList();
     for (MaterializedField field : schema) {
-      fields.add(field.getAsBuilder().build());
+      fields.add(field.getSerializedField());
     }
     RecordBatchDef def = RecordBatchDef.newBuilder().addAllField(fields).build();
-    return new FragmentWritableBatch(false, queryId, sendMajorFragmentId, sendMinorFragmentId,
receiveMajorFragmentId, receiveMinorFragmentId, def);
+    return new FragmentWritableBatch(isLast, queryId, sendMajorFragmentId, sendMinorFragmentId,
receiveMajorFragmentId,
+        receiveMinorFragmentId, def);
   }
 
   public ByteBuf[] getBuffers(){

http://git-wip-us.apache.org/repos/asf/drill/blob/9c0738d9/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
index ef53d2a..64ba861 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
@@ -29,7 +29,6 @@ import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.proto.UserBitShared.SerializedField;
 
@@ -52,6 +51,18 @@ public class MaterializedField {
     return field;
   }
 
+  /**
+   * Create and return a serialized field based on the current state.
+   */
+  public SerializedField getSerializedField() {
+    SerializedField.Builder serializedFieldBuilder = getAsBuilder();
+    for(MaterializedField childMaterializedField : getChildren()) {
+      serializedFieldBuilder.addChild(childMaterializedField.getSerializedField());
+    }
+    return serializedFieldBuilder.build();
+  }
+
+
   public SerializedField.Builder getAsBuilder(){
     return SerializedField.newBuilder() //
         .setMajorType(key.type) //

http://git-wip-us.apache.org/repos/asf/drill/blob/9c0738d9/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
index f6644bd..a152a0e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
@@ -69,6 +69,11 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
     return (int) Math.floor(index / 8.0);
   }
 
+  @Override
+  public void setInitialCapacity(int numRecords) {
+    allocationValueCount = numRecords;
+  }
+
   public void allocateNew() {
     if (!allocateNewSafe()) {
       throw new OutOfMemoryRuntimeException();

http://git-wip-us.apache.org/repos/asf/drill/blob/9c0738d9/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
index 3c15db3..ac050e1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
@@ -92,6 +92,12 @@ public class ObjectVector extends BaseValueVector{
     public void generateTestData(int values) {
     }
   }
+
+  @Override
+  public void setInitialCapacity(int numRecords) {
+    // NoOp
+  }
+
   @Override
   public void allocateNew() throws OutOfMemoryRuntimeException {
     addNewArray();

http://git-wip-us.apache.org/repos/asf/drill/blob/9c0738d9/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVector.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVector.java
index b23ee02..2c2ff54 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVector.java
@@ -17,9 +17,8 @@
  */
 package org.apache.drill.exec.vector;
 
-import io.netty.buffer.DrillBuf;
-
 public interface RepeatedVector {
+  public static final int DEFAULT_REPEAT_PER_RECORD = 4;
 
   public RepeatedFixedWidthVector.RepeatedAccessor getAccessor();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/9c0738d9/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
index df6a486..42e25e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
@@ -48,6 +48,12 @@ public interface ValueVector extends Closeable, Iterable<ValueVector>
{
    */
   public boolean allocateNewSafe();
 
+  /**
+   * Set the initial record capacity
+   * @param numRecords
+   */
+  public void setInitialCapacity(int numRecords);
+
   public int getBufferSize();
 
   /**

http://git-wip-us.apache.org/repos/asf/drill/blob/9c0738d9/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
index c5dc5ba..b995462 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
@@ -93,6 +93,13 @@ public class MapVector extends AbstractMapVector {
   }
 
   @Override
+  public void setInitialCapacity(int numRecords) {
+    for (ValueVector v : this) {
+      v.setInitialCapacity(numRecords);
+    }
+  }
+
+  @Override
   public int getBufferSize() {
     if (valueCount == 0 || size() == 0) {
       return 0;

http://git-wip-us.apache.org/repos/asf/drill/blob/9c0738d9/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
index 131f2a3..cc68181 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
@@ -100,6 +100,13 @@ public class RepeatedListVector extends AbstractContainerVector implements
Repea
     return mutator;
   }
 
+  public void setInitialCapacity(int numRecords) {
+    offsets.setInitialCapacity(numRecords + 1);
+    if (vector != null) {
+      vector.setInitialCapacity(numRecords * DEFAULT_REPEAT_PER_RECORD);
+    }
+  }
+
   @Override
   public boolean allocateNewSafe() {
     if (!offsets.allocateNewSafe()) {

http://git-wip-us.apache.org/repos/asf/drill/blob/9c0738d9/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
index eb045d0..c7e7cba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
@@ -69,6 +69,13 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedFixe
     this.emptyPopulator = new EmptyValuePopulator(offsets);
   }
 
+  public void setInitialCapacity(int numRecords) {
+    offsets.setInitialCapacity(numRecords + 1);
+    for(ValueVector v : this) {
+      v.setInitialCapacity(numRecords * DEFAULT_REPEAT_PER_RECORD);
+    }
+  }
+
   @Override
   public void allocateNew(int groupCount, int valueCount) {
     clear();

http://git-wip-us.apache.org/repos/asf/drill/blob/9c0738d9/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
index 2bed433..1564aea 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
@@ -23,34 +23,50 @@ import static org.junit.Assert.assertEquals;
 import java.nio.charset.Charset;
 
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos.DataMode;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.ExecTest;
 import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.NullableFloat4Holder;
+import org.apache.drill.exec.expr.holders.NullableUInt4Holder;
+import org.apache.drill.exec.expr.holders.NullableVar16CharHolder;
+import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
+import org.apache.drill.exec.expr.holders.RepeatedFloat4Holder;
+import org.apache.drill.exec.expr.holders.RepeatedMapHolder;
+import org.apache.drill.exec.expr.holders.RepeatedVarBinaryHolder;
+import org.apache.drill.exec.expr.holders.UInt4Holder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
 import org.apache.drill.exec.memory.TopLevelAllocator;
-import org.apache.drill.exec.proto.UserBitShared.SerializedField;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.vector.BitVector;
+import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.NullableFloat4Vector;
 import org.apache.drill.exec.vector.NullableUInt4Vector;
 import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.NullableVector;
+import org.apache.drill.exec.vector.RepeatedFixedWidthVector;
+import org.apache.drill.exec.vector.RepeatedVariableWidthVector;
 import org.apache.drill.exec.vector.UInt4Vector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VariableWidthVector;
+import org.apache.drill.exec.vector.VariableWidthVector.VariableWidthAccessor;
+import org.apache.drill.exec.vector.complex.MapVector;
+import org.apache.drill.exec.vector.complex.RepeatedListVector;
+import org.apache.drill.exec.vector.complex.RepeatedMapVector;
 import org.junit.Test;
 
 public class TestValueVector extends ExecTest {
+  private final static SchemaPath EMPTY_SCHEMA_PATH = SchemaPath.getSimplePath("");
+
+  private final static byte[] STR1 = new String("AAAAA1").getBytes(Charset.forName("UTF-8"));
+  private final static byte[] STR2 = new String("BBBBBBBBB2").getBytes(Charset.forName("UTF-8"));
+  private final static byte[] STR3 = new String("CCCC3").getBytes(Charset.forName("UTF-8"));
 
   TopLevelAllocator allocator = new TopLevelAllocator();
 
   @Test
   public void testFixedType() {
-    // Build a required uint field definition
-    MajorType.Builder typeBuilder = MajorType.newBuilder();
-    typeBuilder
-        .setMinorType(MinorType.UINT4)
-        .setMode(DataMode.REQUIRED)
-        .setWidth(4);
-        MaterializedField field = MaterializedField.create(SchemaPath.getSimplePath(""),
typeBuilder.build());
+    MaterializedField field = MaterializedField.create(EMPTY_SCHEMA_PATH, UInt4Holder.TYPE);
 
     // Create a new value vector for 1024 integers
     UInt4Vector v = new UInt4Vector(field, allocator);
@@ -73,34 +89,21 @@ public class TestValueVector extends ExecTest {
 
   @Test
   public void testNullableVarLen2() {
-    // Build an optional varchar field definition
-    MajorType.Builder typeBuilder = MajorType.newBuilder();
-    SerializedField.Builder defBuilder = SerializedField.newBuilder();
-    typeBuilder
-        .setMinorType(MinorType.VARCHAR)
-        .setMode(DataMode.OPTIONAL)
-        .setWidth(2);
-    defBuilder
-        .setMajorType(typeBuilder.build());
-    MaterializedField field = MaterializedField.create(defBuilder.build());
+    MaterializedField field = MaterializedField.create(EMPTY_SCHEMA_PATH, NullableVarCharHolder.TYPE);
 
     // Create a new value vector for 1024 integers
     NullableVarCharVector v = new NullableVarCharVector(field, allocator);
     NullableVarCharVector.Mutator m = v.getMutator();
     v.allocateNew(1024*10, 1024);
 
-    // Create and set 3 sample strings
-    String str1 = new String("AAAAA1");
-    String str2 = new String("BBBBBBBBB2");
-    String str3 = new String("CCCC3");
-    m.set(0, str1.getBytes(Charset.forName("UTF-8")));
-    m.set(1, str2.getBytes(Charset.forName("UTF-8")));
-    m.set(2, str3.getBytes(Charset.forName("UTF-8")));
+    m.set(0, STR1);
+    m.set(1, STR2);
+    m.set(2, STR3);
 
     // Check the sample strings
-    assertEquals(str1, new String(v.getAccessor().get(0), Charset.forName("UTF-8")));
-    assertEquals(str2, new String(v.getAccessor().get(1), Charset.forName("UTF-8")));
-    assertEquals(str3, new String(v.getAccessor().get(2), Charset.forName("UTF-8")));
+    assertArrayEquals(STR1, v.getAccessor().get(0));
+    assertArrayEquals(STR2, v.getAccessor().get(1));
+    assertArrayEquals(STR3, v.getAccessor().get(2));
 
     // Ensure null value throws
     boolean b = false;
@@ -119,16 +122,7 @@ public class TestValueVector extends ExecTest {
 
   @Test
   public void testNullableFixedType() {
-    // Build an optional uint field definition
-    MajorType.Builder typeBuilder = MajorType.newBuilder();
-    SerializedField.Builder defBuilder = SerializedField.newBuilder();
-    typeBuilder
-        .setMinorType(MinorType.UINT4)
-        .setMode(DataMode.OPTIONAL)
-        .setWidth(4);
-    defBuilder
-        .setMajorType(typeBuilder.build());
-    MaterializedField field = MaterializedField.create(defBuilder.build());
+    MaterializedField field = MaterializedField.create(EMPTY_SCHEMA_PATH, NullableUInt4Holder.TYPE);
 
     // Create a new value vector for 1024 integers
     NullableUInt4Vector v = new NullableUInt4Vector(field, allocator);
@@ -206,16 +200,7 @@ public class TestValueVector extends ExecTest {
 
   @Test
   public void testNullableFloat() {
-    // Build an optional float field definition
-    MajorType.Builder typeBuilder = MajorType.newBuilder();
-    SerializedField.Builder defBuilder = SerializedField.newBuilder();
-    typeBuilder
-        .setMinorType(MinorType.FLOAT4)
-        .setMode(DataMode.OPTIONAL)
-        .setWidth(4);
-    defBuilder
-        .setMajorType(typeBuilder.build());
-    MaterializedField field = MaterializedField.create(defBuilder.build());
+    MaterializedField field = MaterializedField.create(EMPTY_SCHEMA_PATH, NullableFloat4Holder.TYPE);
 
     // Create a new value vector for 1024 integers
     NullableFloat4Vector v = (NullableFloat4Vector) TypeHelper.getNewVector(field, allocator);
@@ -265,16 +250,7 @@ public class TestValueVector extends ExecTest {
 
   @Test
   public void testBitVector() {
-    // Build a required boolean field definition
-    MajorType.Builder typeBuilder = MajorType.newBuilder();
-    SerializedField.Builder defBuilder = SerializedField.newBuilder();
-    typeBuilder
-        .setMinorType(MinorType.BIT)
-        .setMode(DataMode.REQUIRED)
-        .setWidth(4);
-    defBuilder
-        .setMajorType(typeBuilder.build());
-    MaterializedField field = MaterializedField.create(defBuilder.build());
+    MaterializedField field = MaterializedField.create(EMPTY_SCHEMA_PATH, BitHolder.TYPE);
 
     // Create a new value vector for 1024 integers
     BitVector v = new BitVector(field, allocator);
@@ -312,16 +288,7 @@ public class TestValueVector extends ExecTest {
 
   @Test
   public void testReAllocNullableFixedWidthVector() throws Exception {
-    // Build an optional float field definition
-    MajorType floatType = MajorType.newBuilder()
-        .setMinorType(MinorType.FLOAT4)
-        .setMode(DataMode.OPTIONAL)
-        .setWidth(4).build();
-
-    MaterializedField field = MaterializedField.create(
-        SerializedField.newBuilder()
-            .setMajorType(floatType)
-            .build());
+    MaterializedField field = MaterializedField.create(EMPTY_SCHEMA_PATH, NullableFloat4Holder.TYPE);
 
     // Create a new value vector for 1024 integers
     NullableFloat4Vector v = (NullableFloat4Vector) TypeHelper.getNewVector(field, allocator);
@@ -355,16 +322,7 @@ public class TestValueVector extends ExecTest {
 
   @Test
   public void testReAllocNullableVariableWidthVector() throws Exception {
-    // Build an optional float field definition
-    MajorType floatType = MajorType.newBuilder()
-        .setMinorType(MinorType.VARCHAR)
-        .setMode(DataMode.OPTIONAL)
-        .setWidth(4).build();
-
-    MaterializedField field = MaterializedField.create(
-        SerializedField.newBuilder()
-            .setMajorType(floatType)
-            .build());
+    MaterializedField field = MaterializedField.create(EMPTY_SCHEMA_PATH, NullableVarCharHolder.TYPE);
 
     // Create a new value vector for 1024 integers
     NullableVarCharVector v = (NullableVarCharVector) TypeHelper.getNewVector(field, allocator);
@@ -374,25 +332,60 @@ public class TestValueVector extends ExecTest {
     int initialCapacity = v.getValueCapacity();
 
     // Put values in indexes that fall within the initial allocation
-    byte[] str1 = new String("AAAAA1").getBytes(Charset.forName("UTF-8"));
-    byte[] str2 = new String("BBBBBBBBB2").getBytes(Charset.forName("UTF-8"));
-    byte[] str3 = new String("CCCC3").getBytes(Charset.forName("UTF-8"));
-
-    m.setSafe(0, str1, 0, str1.length);
-    m.setSafe(initialCapacity - 1, str2, 0, str2.length);
+    m.setSafe(0, STR1, 0, STR1.length);
+    m.setSafe(initialCapacity - 1, STR2, 0, STR2.length);
 
     // Now try to put values in space that falls beyond the initial allocation
-    m.setSafe(initialCapacity + 200, str3, 0, str3.length);
+    m.setSafe(initialCapacity + 200, STR3, 0, STR3.length);
 
     // Check valueCapacity is more than initial allocation
     assertEquals((initialCapacity+1)*2-1, v.getValueCapacity());
 
-    assertArrayEquals(str1, v.getAccessor().get(0));
-    assertArrayEquals(str2, v.getAccessor().get(initialCapacity-1));
-    assertArrayEquals(str3, v.getAccessor().get(initialCapacity + 200));
+    assertArrayEquals(STR1, v.getAccessor().get(0));
+    assertArrayEquals(STR2, v.getAccessor().get(initialCapacity-1));
+    assertArrayEquals(STR3, v.getAccessor().get(initialCapacity + 200));
 
     // Set the valueCount to be more than valueCapacity of current allocation. This is possible
for NullableValueVectors
     // as we don't call setSafe for null values, but we do call setValueCount when the current
batch is processed.
     m.setValueCount(v.getValueCapacity() + 200);
   }
+
+  @Test
+  public void testVVInitialCapacity() {
+    final MaterializedField[] fields = new MaterializedField[9];
+    final ValueVector[] valueVectors = new ValueVector[9];
+
+    fields[0] = MaterializedField.create(EMPTY_SCHEMA_PATH, BitHolder.TYPE);
+    fields[1] = MaterializedField.create(EMPTY_SCHEMA_PATH, IntHolder.TYPE);
+    fields[2] = MaterializedField.create(EMPTY_SCHEMA_PATH, VarCharHolder.TYPE);
+    fields[3] = MaterializedField.create(EMPTY_SCHEMA_PATH, NullableVar16CharHolder.TYPE);
+    fields[4] = MaterializedField.create(EMPTY_SCHEMA_PATH, RepeatedFloat4Holder.TYPE);
+    fields[5] = MaterializedField.create(EMPTY_SCHEMA_PATH, RepeatedVarBinaryHolder.TYPE);
+
+    fields[6] = MaterializedField.create(EMPTY_SCHEMA_PATH, MapVector.TYPE);
+    fields[6].addChild(fields[0] /*bit*/);
+    fields[6].addChild(fields[2] /*varchar*/);
+
+    fields[7] = MaterializedField.create(EMPTY_SCHEMA_PATH, RepeatedMapVector.TYPE);
+    fields[7].addChild(fields[1] /*int*/);
+    fields[7].addChild(fields[3] /*optional var16char*/);
+
+    fields[8] = MaterializedField.create(EMPTY_SCHEMA_PATH, RepeatedListVector.TYPE);
+    fields[8].addChild(fields[1] /*int*/);
+
+    final int initialCapacity = 1024;
+
+    for(int i=0; i<valueVectors.length; i++) {
+      valueVectors[i] = TypeHelper.getNewVector(fields[i], allocator);
+      valueVectors[i].setInitialCapacity(initialCapacity);
+      valueVectors[i].allocateNew();
+    }
+
+    for(int i=0; i<valueVectors.length; i++) {
+      final ValueVector vv = valueVectors[i];
+      final int vvCapacity = vv.getValueCapacity();
+      assertEquals(String.format("Incorrect value capacity for %s [%d]", vv.getField(), vvCapacity),
+          initialCapacity, vvCapacity);
+    }
+  }
 }


Mime
View raw message