drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j..@apache.org
Subject [01/13] drill git commit: DRILL-5601: Rollup of external sort fixes an improvements
Date Tue, 15 Aug 2017 13:44:13 GMT
Repository: drill
Updated Branches:
  refs/heads/master 5c57b50f2 -> 073ea6819


http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
index e249c19..d2ff805 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
@@ -33,6 +33,7 @@ import org.apache.drill.exec.ops.OperExecContext;
 import org.apache.drill.exec.physical.config.Sort;
 import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.physical.impl.xsort.managed.SortImpl.SortResults;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.record.BatchSchema;
@@ -90,8 +91,15 @@ public class TestSortImpl extends DrillTest {
           .setQueryId(queryId)
           .build();
     SortConfig sortConfig = new SortConfig(opContext.getConfig());
+    DrillbitEndpoint ep = DrillbitEndpoint.newBuilder()
+        .setAddress("foo.bar.com")
+        .setUserPort(1234)
+        .setControlPort(1235)
+        .setDataPort(1236)
+        .setVersion("1.11")
+        .build();
     SpillSet spillSet = new SpillSet(opContext.getConfig(), handle,
-                                     popConfig);
+                                     popConfig, ep);
     PriorityQueueCopierWrapper copierHolder = new PriorityQueueCopierWrapper(opContext);
     SpilledRuns spilledRuns = new SpilledRuns(opContext, spillSet, copierHolder);
     return new SortImpl(opContext, sortConfig, spilledRuns, outputBatch);

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
index 4200073..f525020 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -119,6 +119,7 @@ public class TestDrillbitResilience extends DrillTest {
     }
 
     try {
+      @SuppressWarnings("resource")
       final Drillbit drillbit = Drillbit.start(zkHelper.getConfig(), remoteServiceSet);
       drillbits.put(name, drillbit);
     } catch (final DrillbitStartupException e) {
@@ -132,6 +133,7 @@ public class TestDrillbitResilience extends DrillTest {
    * @param name name of the drillbit
    */
   private static void stopDrillbit(final String name) {
+    @SuppressWarnings("resource")
     final Drillbit drillbit = drillbits.get(name);
     if (drillbit == null) {
       throw new IllegalStateException("No Drillbit named \"" + name + "\" found");
@@ -168,8 +170,8 @@ public class TestDrillbitResilience extends DrillTest {
    * @param name name of the drillbit
    * @return endpoint of the drillbit
    */
+  @SuppressWarnings("resource")
   private static DrillbitEndpoint getEndpoint(final String name) {
-    @SuppressWarnings("resource")
     final Drillbit drillbit = drillbits.get(name);
     if (drillbit == null) {
       throw new IllegalStateException("No Drillbit named \"" + name + "\" found.");
@@ -508,9 +510,11 @@ public class TestDrillbitResilience extends DrillTest {
   }
 
   /**
-   * Given the result of {@link WaitUntilCompleteListener#waitForCompletion}, this method fails if the completed state
-   * is not as expected, or if an exception is thrown. The completed state could be COMPLETED or CANCELED. This state
-   * is set when {@link WaitUntilCompleteListener#queryCompleted} is called.
+   * Given the result of {@link WaitUntilCompleteListener#waitForCompletion},
+   * this method fails if the completed state is not as expected, or if an
+   * exception is thrown. The completed state could be COMPLETED or CANCELED.
+   * This state is set when {@link WaitUntilCompleteListener#queryCompleted} is
+   * called.
    */
   private static void assertStateCompleted(final Pair<QueryState, Exception> result, final QueryState expectedState) {
     final QueryState actualState = result.getFirst();
@@ -758,8 +762,8 @@ public class TestDrillbitResilience extends DrillTest {
   }
 
   /**
-   * Test cancelling query interrupts currently blocked FragmentExecutor threads waiting for some event to happen.
-   * Specifically tests cancelling fragment which has {@link MergingRecordBatch} blocked waiting for data.
+   * Test canceling query interrupts currently blocked FragmentExecutor threads waiting for some event to happen.
+   * Specifically tests canceling fragment which has {@link MergingRecordBatch} blocked waiting for data.
    */
   @Test
   @Repeat(count = NUM_RUNS)
@@ -776,8 +780,8 @@ public class TestDrillbitResilience extends DrillTest {
   }
 
   /**
-   * Test cancelling query interrupts currently blocked FragmentExecutor threads waiting for some event to happen.
-   * Specifically tests cancelling fragment which has {@link UnorderedReceiverBatch} blocked waiting for data.
+   * Test canceling query interrupts currently blocked FragmentExecutor threads waiting for some event to happen.
+   * Specifically tests canceling fragment which has {@link UnorderedReceiverBatch} blocked waiting for data.
    */
   @Test
   @Repeat(count = NUM_RUNS)
@@ -931,7 +935,13 @@ public class TestDrillbitResilience extends DrillTest {
 
   @Test // DRILL-3065
   public void failsAfterMSorterSorting() {
-    final String query = "select n_name from cp.`tpch/nation.parquet` order by n_name";
+
+    // Note: must use an input table that returns more than one
+    // batch. The sort uses an optimization for single-batch inputs
+    // which bypasses the code where this partiucular fault is
+    // injected.
+
+    final String query = "select n_name from cp.`tpch/lineitem.parquet` order by n_name";
     final Class<? extends Exception> typeOfException = RuntimeException.class;
 
     final long before = countAllocatedMemory();
@@ -946,7 +956,13 @@ public class TestDrillbitResilience extends DrillTest {
 
   @Test // DRILL-3085
   public void failsAfterMSorterSetup() {
-    final String query = "select n_name from cp.`tpch/nation.parquet` order by n_name";
+
+    // Note: must use an input table that returns more than one
+    // batch. The sort uses an optimization for single-batch inputs
+    // which bypasses the code where this partiucular fault is
+    // injected.
+
+    final String query = "select n_name from cp.`tpch/lineitem.parquet` order by n_name";
     final Class<? extends Exception> typeOfException = RuntimeException.class;
 
     final long before = countAllocatedMemory();

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
index 5a06ec2..4e44464 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
@@ -17,8 +17,13 @@
  */
 package org.apache.drill.test;
 
+import java.io.BufferedReader;
 import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
 import java.io.IOException;
+import java.io.Reader;
+import java.io.StringReader;
 import java.util.List;
 import java.util.Properties;
 
@@ -231,4 +236,119 @@ public class ClientFixture implements AutoCloseable {
   public RowSetBuilder rowSetBuilder(BatchSchema schema) {
     return new RowSetBuilder(allocator(), schema);
   }
+
+  /**
+   * Very simple parser for semi-colon separated lists of SQL statements which
+   * handles quoted semicolons. Drill can execute only one statement at a time
+   * (without a trailing semi-colon.) This parser breaks up a statement list
+   * into single statements. Input:<code><pre>
+   * USE a.b;
+   * ALTER SESSION SET `foo` = ";";
+   * SELECT * FROM bar WHERE x = "\";";
+   * </pre><code>Output:
+   * <ul>
+   * <li><tt>USE a.b</tt></li>
+   * <li><tt>ALTER SESSION SET `foo` = ";"</tt></li>
+   * <li><tt>SELECT * FROM bar WHERE x = "\";"</tt></li>
+   */
+
+  public static class StatementParser {
+    private final Reader in;
+    private StringBuilder buf;
+
+    public StatementParser(Reader in) {
+      this.in = in;
+    }
+
+    public String parseNext() throws IOException {
+      boolean eof = false;
+      buf = new StringBuilder();
+      for (;;) {
+        int c = in.read();
+        if (c == -1) {
+          eof = true;
+          break;
+        }
+        if (c == ';') {
+          break;
+        }
+        buf.append((char) c);
+        if (c == '"' || c == '\'' || c == '`') {
+          int quote = c;
+          boolean escape = false;
+          for (;;) {
+            c = in.read();
+            if (c == -1) {
+              throw new IllegalArgumentException("Mismatched quote: " + (char) c);
+            }
+            buf.append((char) c);
+            if (! escape && c == quote) {
+              break;
+            }
+            escape = c == '\\';
+          }
+        }
+      }
+      String stmt = buf.toString().trim();
+      if (stmt.isEmpty() && eof) {
+        return null;
+      }
+      return stmt;
+    }
+  }
+
+  private boolean trace = false;
+
+  public void enableTrace(boolean flag) {
+    this.trace = flag;
+  }
+
+  public int exec(Reader in) throws IOException {
+    StatementParser parser = new StatementParser(in);
+    int count = 0;
+    for (;;) {
+      String stmt = parser.parseNext();
+      if (stmt == null) {
+        if (trace) {
+          System.out.println("----");
+        }
+        return count;
+      }
+      if (stmt.isEmpty()) {
+        continue;
+      }
+      if (trace) {
+        System.out.println("----");
+        System.out.println(stmt);
+      }
+      runSqlSilently(stmt);
+      count++;
+    }
+  }
+
+  /**
+   * Execute a set of statements from a file.
+   * @param stmts the set of statements, separated by semicolons
+   * @return the number of statements executed
+   */
+
+  public int exec(File source) throws FileNotFoundException, IOException {
+    try (Reader in = new BufferedReader(new FileReader(source))) {
+      return exec(in);
+    }
+  }
+
+  /**
+   * Execute a set of statements from a string.
+   * @param stmts the set of statements, separated by semicolons
+   * @return the number of statements executed
+   */
+
+  public int exec(String stmts) {
+    try (Reader in = new StringReader(stmts)) {
+      return exec(in);
+    } catch (IOException e) {
+      throw new IllegalStateException(e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/test/java/org/apache/drill/test/ProfileParser.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ProfileParser.java b/exec/java-exec/src/test/java/org/apache/drill/test/ProfileParser.java
index 1dafef7..d9e344a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ProfileParser.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ProfileParser.java
@@ -545,7 +545,7 @@ public class ProfileParser {
 
       p = Pattern.compile("rowcount = ([\\d.E]+), cumulative cost = \\{([\\d.E]+) rows, ([\\d.E]+) cpu, ([\\d.E]+) io, ([\\d.E]+) network, ([\\d.E]+) memory\\}, id = (\\d+)");
       m = p.matcher(tail);
-      if (! m.matches()) {
+      if (! m.find()) {
         throw new IllegalStateException("Could not parse costs: " + tail);
       }
       estRows = Double.parseDouble(m.group(1));

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
index f2a27c8..37fcdfd 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
@@ -17,6 +17,11 @@
  */
 package org.apache.drill.test;
 
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -48,6 +53,7 @@ import org.apache.drill.exec.util.VectorUtil;
 import org.apache.drill.exec.vector.NullableVarCharVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.test.BufferingQueryEventListener.QueryEvent;
+import org.apache.drill.test.ClientFixture.StatementParser;
 import org.apache.drill.test.rowSet.DirectRowSet;
 import org.apache.drill.test.rowSet.RowSet;
 import org.apache.drill.test.rowSet.RowSet.RowSetReader;
@@ -230,6 +236,25 @@ public class QueryBuilder {
     return sql(String.format(query, args));
   }
 
+  /**
+   * Parse a single SQL statement (with optional ending semi-colon) from
+   * the file provided.
+   * @param file the file containing exactly one SQL statement, with
+   * optional ending semi-colon
+   * @return this builder
+   */
+
+  public QueryBuilder sql(File file) throws FileNotFoundException, IOException {
+    try (BufferedReader in = new BufferedReader(new FileReader(file))) {
+      StatementParser parser = new StatementParser(in);
+      String sql = parser.parseNext();
+      if (sql == null) {
+        throw new IllegalArgumentException("No query found");
+      }
+      return sql(sql);
+    }
+  }
+
   public QueryBuilder physical(String plan) {
     return query(QueryType.PHYSICAL, plan);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
index 5139086..a9feafd 100644
--- a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
+++ b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
@@ -81,7 +81,6 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
     if (BaseAllocator.DEBUG) {
       historicalLog.recordEvent("create()");
     }
-
   }
 
   public DrillBuf reallocIfNeeded(final int size) {
@@ -184,15 +183,15 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
   /**
    * Transfer the memory accounting ownership of this DrillBuf to another allocator. This will generate a new DrillBuf
    * that carries an association with the underlying memory of this DrillBuf. If this DrillBuf is connected to the
-   * owning BufferLedger of this memory, that memory ownership/accounting will be transferred to the taret allocator. If
+   * owning BufferLedger of this memory, that memory ownership/accounting will be transferred to the target allocator. If
    * this DrillBuf does not currently own the memory underlying it (and is only associated with it), this does not
    * transfer any ownership to the newly created DrillBuf.
-   *
+   * <p>
    * This operation has no impact on the reference count of this DrillBuf. The newly created DrillBuf with either have a
    * reference count of 1 (in the case that this is the first time this memory is being associated with the new
    * allocator) or the current value of the reference count for the other AllocationManager/BufferLedger combination in
    * the case that the provided allocator already had an association to this underlying memory.
-   *
+   * <p>
    * Transfers will always succeed, even if that puts the other allocator into an overlimit situation. This is possible
    * due to the fact that the original owning allocator may have allocated this memory out of a local reservation
    * whereas the target allocator may need to allocate new memory from a parent or RootAllocator. This operation is done
@@ -218,6 +217,13 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
   }
 
   /**
+   * Visible only for memory allocation calculations.
+   *
+   * @return
+   */
+  public BufferLedger getLedger() { return ledger; }
+
+  /**
    * The outcome of a Transfer.
    */
   public class TransferResult {
@@ -236,7 +242,6 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
       this.allocationFit = allocationFit;
       this.buffer = buffer;
     }
-
   }
 
   @Override
@@ -269,9 +274,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
       throw new IllegalStateException(
           String.format("DrillBuf[%d] refCnt has gone negative. Buffer Info: %s", id, toVerboseString()));
     }
-
     return refCnt == 0;
-
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java
index 34647f9..833a604 100644
--- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java
@@ -34,6 +34,7 @@ import org.apache.drill.exec.memory.BaseAllocator.Verbosity;
 import org.apache.drill.exec.metrics.DrillMetrics;
 import org.apache.drill.exec.ops.BufferManager;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 /**
@@ -246,7 +247,6 @@ public class AllocationManager {
         owningLedger = target;
         return overlimit;
       }
-
     }
 
     /**
@@ -387,11 +387,10 @@ public class AllocationManager {
       }
 
       return buf;
-
     }
 
     /**
-     * What is the total size (in bytes) of memory underlying this ledger.
+     * The total size (in bytes) of memory underlying this ledger.
      *
      * @return Size in bytes
      */
@@ -400,7 +399,7 @@ public class AllocationManager {
     }
 
     /**
-     * How much memory is accounted for by this ledger. This is either getSize() if this is the owning ledger for the
+     * Amount of memory accounted for by this ledger. This is either getSize() if this is the owning ledger for the
      * memory or zero in the case that this is not the owning ledger associated with this memory.
      *
      * @return Amount of accounted(owned) memory associated with this ledger.
@@ -418,17 +417,17 @@ public class AllocationManager {
     /**
      * Package visible for debugging/verification only.
      */
-    UnsafeDirectLittleEndian getUnderlying() {
+    @VisibleForTesting
+    protected UnsafeDirectLittleEndian getUnderlying() {
       return underlying;
     }
 
     /**
      * Package visible for debugging/verification only.
      */
-    boolean isOwningLedger() {
+    @VisibleForTesting
+    protected boolean isOwningLedger() {
       return this == owningLedger;
     }
-
   }
-
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
index 8efa3ee..4e17eda 100644
--- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
@@ -841,6 +841,7 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
 
   @Override
   public void write(DrillBuf buf, OutputStream out) throws IOException {
+    assert(buf.readerIndex() == 0);
     write(buf, buf.readableBytes(), out);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/vector/src/main/codegen/includes/vv_imports.ftl
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/includes/vv_imports.ftl b/exec/vector/src/main/codegen/includes/vv_imports.ftl
index 87a2106..efca346 100644
--- a/exec/vector/src/main/codegen/includes/vv_imports.ftl
+++ b/exec/vector/src/main/codegen/includes/vv_imports.ftl
@@ -43,12 +43,14 @@ import org.apache.drill.exec.vector.complex.writer.*;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter;
 import org.apache.drill.exec.util.JsonStringArrayList;
+import org.apache.drill.exec.memory.AllocationManager.BufferLedger;
 
 import org.apache.drill.exec.exception.OutOfMemoryException;
 
 import java.util.Arrays;
 import java.util.Random;
 import java.util.List;
+import java.util.Set;
 
 import java.io.Closeable;
 import java.io.InputStream;

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/vector/src/main/codegen/templates/FixedValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/FixedValueVectors.java b/exec/vector/src/main/codegen/templates/FixedValueVectors.java
index 1e83a4f..5a53e21 100644
--- a/exec/vector/src/main/codegen/templates/FixedValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/FixedValueVectors.java
@@ -266,8 +266,8 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
   }
 
   @Override
-  public int getPayloadByteCount() {
-    return getAccessor().getValueCount() * VALUE_WIDTH;
+  public int getPayloadByteCount(int valueCount) {
+    return valueCount * ${type.width};
   }
 
   private class TransferImpl implements TransferPair{

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/vector/src/main/codegen/templates/NullableValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/NullableValueVectors.java b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
index 0f8d90c..a2c0deb 100644
--- a/exec/vector/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
@@ -15,12 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+import org.apache.drill.exec.memory.AllocationManager.BufferLedger;
 import org.apache.drill.exec.util.DecimalUtility;
 import org.apache.drill.exec.vector.BaseDataValueVector;
 import org.apache.drill.exec.vector.NullableVectorDefinitionSetter;
 
 import java.lang.Override;
 import java.lang.UnsupportedOperationException;
+import java.util.Set;
 
 <@pp.dropOutputFile />
 <#list vv.types as type>
@@ -177,15 +179,16 @@ public final class ${className} extends BaseDataValueVector implements <#if type
   }
 
   @Override
-  public int getAllocatedByteCount() {
-    return bits.getAllocatedByteCount() + values.getAllocatedByteCount();
+  public void collectLedgers(Set<BufferLedger> ledgers) {
+    bits.collectLedgers(ledgers);
+    values.collectLedgers(ledgers);
   }
 
   @Override
-  public int getPayloadByteCount() {
+  public int getPayloadByteCount(int valueCount) {
     // For nullable, we include all values, null or not, in computing
     // the value length.
-    return bits.getPayloadByteCount() + values.getPayloadByteCount();
+    return bits.getPayloadByteCount(valueCount) + values.getPayloadByteCount(valueCount);
   }
 
   <#if type.major == "VarLen">
@@ -225,7 +228,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
   public void allocateNew(int valueCount) {
     try {
       values.allocateNew(valueCount);
-      bits.allocateNew(valueCount+1);
+      bits.allocateNew(valueCount);
     } catch(OutOfMemoryException e) {
       clear();
       throw e;

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/vector/src/main/codegen/templates/UnionVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/UnionVector.java b/exec/vector/src/main/codegen/templates/UnionVector.java
index 207e55a..2c732f4 100644
--- a/exec/vector/src/main/codegen/templates/UnionVector.java
+++ b/exec/vector/src/main/codegen/templates/UnionVector.java
@@ -29,9 +29,12 @@ package org.apache.drill.exec.vector.complex;
 
 <#include "/@includes/vv_imports.ftl" />
 import java.util.Iterator;
+import java.util.Set;
+
 import org.apache.drill.exec.vector.complex.impl.ComplexCopier;
 import org.apache.drill.exec.util.CallBack;
 import org.apache.drill.exec.expr.BasicTypeHelper;
+import org.apache.drill.exec.memory.AllocationManager.BufferLedger;
 
 /*
  * This class is generated using freemarker and the ${.template_name} template.
@@ -203,19 +206,18 @@ public class UnionVector implements ValueVector {
   }
 
   @Override
-  public int getAllocatedByteCount() {
+  public void collectLedgers(Set<BufferLedger> ledgers) {
     // Most vectors are held inside the internal map.
 
-    int count = internalMap.getAllocatedByteCount();
+    internalMap.collectLedgers(ledgers);
     if (bit != null) {
-      count += bit.getAllocatedByteCount();
+      bit.collectLedgers(ledgers);
     }
-    return count;
   }
 
   @Override
-  public int getPayloadByteCount() {
-    return internalMap.getPayloadByteCount();
+  public int getPayloadByteCount(int valueCount) {
+    return internalMap.getPayloadByteCount(valueCount);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
index 9a9e178..0eb8906 100644
--- a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
@@ -17,8 +17,10 @@
  */
 
 import java.lang.Override;
+import java.util.Set;
 
 import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.memory.AllocationManager.BufferLedger;
 import org.apache.drill.exec.vector.BaseDataValueVector;
 import org.apache.drill.exec.vector.BaseValueVector;
 import org.apache.drill.exec.vector.VariableWidthVector;
@@ -247,27 +249,26 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
   }
 
   @Override
-  public int getAllocatedByteCount() {
-    return offsetVector.getAllocatedByteCount() + super.getAllocatedByteCount();
+  public void collectLedgers(Set<BufferLedger> ledgers) {
+    offsetVector.collectLedgers(ledgers);
+    super.collectLedgers(ledgers);
   }
 
   @Override
-  public int getPayloadByteCount() {
-    UInt${type.width}Vector.Accessor a = offsetVector.getAccessor();
-    int count = a.getValueCount();
-    if (count == 0) {
+  public int getPayloadByteCount(int valueCount) {
+    if (valueCount == 0) {
       return 0;
-    } else {
-      // If 1 or more values, then the last value is set to
-      // the offset of the next value, which is the same as
-      // the length of existing values.
-      // In addition to the actual data bytes, we must also
-      // include the "overhead" bytes: the offset vector entries
-      // that accompany each column value. Thus, total payload
-      // size is consumed text bytes + consumed offset vector
-      // bytes.
-      return a.get(count-1) + offsetVector.getPayloadByteCount();
     }
+    // If 1 or more values, then the last value is set to
+    // the offset of the next value, which is the same as
+    // the length of existing values.
+    // In addition to the actual data bytes, we must also
+    // include the "overhead" bytes: the offset vector entries
+    // that accompany each column value. Thus, total payload
+    // size is consumed text bytes + consumed offset vector
+    // bytes.
+    return offsetVector.getAccessor().get(valueCount) +
+           offsetVector.getPayloadByteCount(valueCount);
   }
 
   private class TransferImpl implements TransferPair{
@@ -308,7 +309,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     if (size > MAX_ALLOCATION_SIZE) {
       throw new OversizedAllocationException("Requested amount of memory is more than max allowed allocation size");
     }
-    allocationSizeInBytes = (int)size;
+    allocationSizeInBytes = (int) size;
     offsetVector.setInitialCapacity(valueCount + 1);
   }
 
@@ -385,7 +386,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
       throw new OversizedAllocationException("Unable to expand the buffer. Max allowed buffer size is reached.");
     }
 
-    logger.trace("Reallocating VarChar, new size {}",newAllocationSize);
+    logger.trace("Reallocating VarChar, new size {}", newAllocationSize);
     final DrillBuf newBuf = allocator.buffer((int)newAllocationSize);
     newBuf.setBytes(0, data, 0, data.capacity());
     data.release();

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/vector/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
index 77d457b..1401373 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -21,41 +21,42 @@ import org.apache.drill.exec.vector.complex.RepeatedFixedWidthVectorLike;
 import org.apache.drill.exec.vector.complex.RepeatedVariableWidthVectorLike;
 
 public class AllocationHelper {
-//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AllocationHelper.class);
 
-  public static void allocate(ValueVector v, int valueCount, int bytesPerValue) {
-    allocate(v, valueCount, bytesPerValue, 5);
+  public static void allocate(ValueVector vector, int valueCount, int bytesPerValue) {
+    allocate(vector, valueCount, bytesPerValue, 5);
   }
 
-  public static void allocatePrecomputedChildCount(ValueVector v, int valueCount, int bytesPerValue, int childValCount){
-    if(v instanceof FixedWidthVector) {
-      ((FixedWidthVector) v).allocateNew(valueCount);
-    } else if (v instanceof VariableWidthVector) {
-      ((VariableWidthVector) v).allocateNew(valueCount * bytesPerValue, valueCount);
-    } else if(v instanceof RepeatedFixedWidthVectorLike) {
-      ((RepeatedFixedWidthVectorLike) v).allocateNew(valueCount, childValCount);
-    } else if(v instanceof RepeatedVariableWidthVectorLike) {
-      ((RepeatedVariableWidthVectorLike) v).allocateNew(childValCount * bytesPerValue, valueCount, childValCount);
+  public static void allocatePrecomputedChildCount(ValueVector vector, int valueCount, int bytesPerValue, int childValCount) {
+    if (vector instanceof FixedWidthVector) {
+      ((FixedWidthVector) vector).allocateNew(valueCount);
+    } else if (vector instanceof VariableWidthVector) {
+      ((VariableWidthVector) vector).allocateNew(valueCount * bytesPerValue, valueCount);
+    } else if (vector instanceof RepeatedFixedWidthVectorLike) {
+      ((RepeatedFixedWidthVectorLike) vector).allocateNew(valueCount, childValCount);
+    } else if (vector instanceof RepeatedVariableWidthVectorLike && childValCount > 0 && bytesPerValue > 0) {
+      // Assertion thrown if byte count is zero in the full allocateNew,
+      // so use default version instead.
+      ((RepeatedVariableWidthVectorLike) vector).allocateNew(childValCount * bytesPerValue, valueCount, childValCount);
     } else {
-      v.allocateNew();
+      vector.allocateNew();
     }
   }
 
-  public static void allocate(ValueVector v, int valueCount, int bytesPerValue, int repeatedPerTop){
-    allocatePrecomputedChildCount(v, valueCount, bytesPerValue, repeatedPerTop * valueCount);
+  public static void allocate(ValueVector vector, int valueCount, int bytesPerValue, int repeatedPerTop){
+    allocatePrecomputedChildCount(vector, valueCount, bytesPerValue, repeatedPerTop * valueCount);
   }
 
   /**
    * Allocates the exact amount if v is fixed width, otherwise falls back to dynamic allocation
-   * @param v value vector we are trying to allocate
+   * @param vector value vector we are trying to allocate
    * @param valueCount  size we are trying to allocate
    * @throws org.apache.drill.exec.memory.OutOfMemoryException if it can't allocate the memory
    */
-  public static void allocateNew(ValueVector v, int valueCount) {
-    if (v instanceof  FixedWidthVector) {
-      ((FixedWidthVector) v).allocateNew(valueCount);
+  public static void allocateNew(ValueVector vector, int valueCount) {
+    if (vector instanceof  FixedWidthVector) {
+      ((FixedWidthVector) vector).allocateNew(valueCount);
     } else {
-      v.allocateNew();
+      vector.allocateNew();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
index 5ce58ed..e98a417 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
@@ -18,7 +18,11 @@
 package org.apache.drill.exec.vector;
 
 import io.netty.buffer.DrillBuf;
+
+import java.util.Set;
+
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.AllocationManager.BufferLedger;
 import org.apache.drill.exec.record.MaterializedField;
 
 
@@ -87,9 +91,6 @@ public abstract class BaseDataValueVector extends BaseValueVector {
   public void reset() {}
 
   @Override
-  public int getAllocatedByteCount() { return data.capacity(); }
-
-  @Override
   public void exchange(ValueVector other) {
     BaseDataValueVector target = (BaseDataValueVector) other;
     DrillBuf temp = data;
@@ -99,4 +100,11 @@ public abstract class BaseDataValueVector extends BaseValueVector {
     getMutator().exchange(target.getMutator());
     // No state in an Accessor to reset
   }
+
+  public void collectLedgers(Set<BufferLedger> ledgers) {
+    BufferLedger ledger = data.getLedger();
+    if (ledger != null) {
+      ledgers.add(ledger);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java
index 0062e77..a7c81de 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java
@@ -528,8 +528,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
   }
 
   @Override
-  public int getPayloadByteCount() {
-    // One byte per value
-    return valueCount;
+  public int getPayloadByteCount(int valueCount) {
+    return getSizeFromCount(valueCount);
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/vector/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/ObjectVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
index bd8566d..3136e32 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
@@ -22,9 +22,11 @@ import io.netty.buffer.DrillBuf;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.drill.exec.expr.holders.ObjectHolder;
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.AllocationManager.BufferLedger;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.record.MaterializedField;
@@ -229,13 +231,10 @@ public class ObjectVector extends BaseValueVector {
   }
 
   @Override
-  public int getAllocatedByteCount() {
-    // Values not stored in direct memory?
-    return 0;
-  }
+  public void collectLedgers(Set<BufferLedger> ledgers) {}
 
   @Override
-  public int getPayloadByteCount() {
+  public int getPayloadByteCount(int valueCount) {
     // Values not stored in direct memory?
     return 0;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java
index 3bc43fa..2926862 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java
@@ -18,10 +18,12 @@
 package org.apache.drill.exec.vector;
 
 import java.io.Closeable;
+import java.util.Set;
 
 import io.netty.buffer.DrillBuf;
 
 import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.memory.AllocationManager.BufferLedger;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.UserBitShared.SerializedField;
 import org.apache.drill.exec.record.MaterializedField;
@@ -204,16 +206,19 @@ public interface ValueVector extends Closeable, Iterable<ValueVector> {
   void copyEntry(int toIndex, ValueVector from, int fromIndex);
 
   /**
-   * Return the total memory consumed by all buffers within this vector.
+   * Add the ledgers underlying the buffers underlying the components of the
+   * vector to the set provided. Used to determine actual memory allocation.
+   *
+   * @param ledgers set of ledgers to which to add ledgers for this vector
    */
 
-  int getAllocatedByteCount();
+  void collectLedgers(Set<BufferLedger> ledgers);
 
   /**
    * Return the number of value bytes consumed by actual data.
    */
 
-  int getPayloadByteCount();
+  int getPayloadByteCount(int valueCount);
 
   /**
    * Exchange state with another value vector of the same type.

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/vector/src/main/java/org/apache/drill/exec/vector/ZeroVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/ZeroVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/ZeroVector.java
index 9a0b6be..5786487 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/ZeroVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/ZeroVector.java
@@ -18,11 +18,13 @@
 package org.apache.drill.exec.vector;
 
 import java.util.Iterator;
+import java.util.Set;
 
 import com.google.common.collect.Iterators;
 import io.netty.buffer.DrillBuf;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.AllocationManager.BufferLedger;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.record.MaterializedField;
@@ -158,11 +160,10 @@ public class ZeroVector implements ValueVector {
   public void copyEntry(int toIndex, ValueVector from, int fromIndex) { }
 
   @Override
-  public int getAllocatedByteCount() { return 0; }
+  public void exchange(ValueVector other) { }
 
-  @Override
-  public int getPayloadByteCount() { return 0; }
+  public void collectLedgers(Set<BufferLedger> ledgers) {}
 
   @Override
-  public void exchange(ValueVector other) { }
+  public int getPayloadByteCount(int valueCount) { return 0; }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
index baba086..30db41e 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
@@ -22,11 +22,13 @@ import io.netty.buffer.DrillBuf;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.drill.common.collections.MapWithOrdinal;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.expr.BasicTypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.AllocationManager.BufferLedger;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.util.CallBack;
 import org.apache.drill.exec.vector.ValueVector;
@@ -117,6 +119,7 @@ public abstract class AbstractMapVector extends AbstractContainerVector {
    *
    * @return resultant {@link org.apache.drill.exec.vector.ValueVector}
    */
+  @SuppressWarnings("unchecked")
   @Override
   public <T extends ValueVector> T addOrGet(String name, TypeProtos.MajorType type, Class<T> clazz) {
     final ValueVector existing = getChild(name);
@@ -277,21 +280,18 @@ public abstract class AbstractMapVector extends AbstractContainerVector {
   }
 
   @Override
-  public int getAllocatedByteCount() {
-    int count = 0;
-
+  public void collectLedgers(Set<BufferLedger> ledgers) {
     for (final ValueVector v : vectors.values()) {
-      count += v.getAllocatedByteCount();
+      v.collectLedgers(ledgers);
     }
-    return count;
   }
 
   @Override
-  public int getPayloadByteCount() {
+  public int getPayloadByteCount(int valueCount) {
     int count = 0;
 
     for (final ValueVector v : vectors.values()) {
-      count += v.getPayloadByteCount();
+      count += v.getPayloadByteCount(valueCount);
     }
     return count;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java
index 5b8f44d..2b41b8b 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java
@@ -21,12 +21,14 @@ import io.netty.buffer.DrillBuf;
 
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.Set;
 
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.exception.SchemaChangeRuntimeException;
 import org.apache.drill.exec.expr.BasicTypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.AllocationManager.BufferLedger;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.vector.AddOrGetResult;
@@ -182,6 +184,7 @@ public abstract class BaseRepeatedValueVector extends BaseValueVector implements
     return vector == DEFAULT_DATA_VECTOR ? 0:1;
   }
 
+  @SuppressWarnings("unchecked")
   @Override
   public <T extends ValueVector> AddOrGetResult<T> addOrGetVector(VectorDescriptor descriptor) {
     boolean created = false;
@@ -210,13 +213,15 @@ public abstract class BaseRepeatedValueVector extends BaseValueVector implements
   }
 
   @Override
-  public int getAllocatedByteCount() {
-    return offsets.getAllocatedByteCount() + vector.getAllocatedByteCount();
+  public void collectLedgers(Set<BufferLedger> ledgers) {
+    offsets.collectLedgers(ledgers);
+    vector.collectLedgers(ledgers);
   }
 
   @Override
-  public int getPayloadByteCount() {
-    return offsets.getPayloadByteCount() + vector.getPayloadByteCount();
+  public int getPayloadByteCount(int valueCount) {
+    int entryCount = offsets.getAccessor().get(valueCount);
+    return offsets.getPayloadByteCount(valueCount) + vector.getPayloadByteCount(entryCount);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java
index 7f0e939..c61fd00 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java
@@ -22,6 +22,7 @@ import io.netty.buffer.DrillBuf;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.AllocationManager.BufferLedger;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.record.MaterializedField;
@@ -41,6 +42,7 @@ import org.apache.drill.exec.vector.complex.reader.FieldReader;
 import org.apache.drill.exec.vector.complex.writer.FieldWriter;
 
 import java.util.List;
+import java.util.Set;
 
 public class ListVector extends BaseRepeatedValueVector {
 
@@ -323,12 +325,15 @@ public class ListVector extends BaseRepeatedValueVector {
   }
 
   @Override
-  public int getAllocatedByteCount() {
-    return offsets.getAllocatedByteCount() + bits.getAllocatedByteCount() + super.getAllocatedByteCount();
+  public void collectLedgers(Set<BufferLedger> ledgers) {
+    offsets.collectLedgers(ledgers);
+    bits.collectLedgers(ledgers);
+    super.collectLedgers(ledgers);
   }
 
   @Override
-  public int getPayloadByteCount() {
-    return offsets.getPayloadByteCount() + bits.getPayloadByteCount() + super.getPayloadByteCount();
+  public int getPayloadByteCount(int valueCount) {
+    return offsets.getPayloadByteCount(valueCount) + bits.getPayloadByteCount(valueCount) +
+           super.getPayloadByteCount(valueCount);
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
index 969c141..ab2c3d8 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
@@ -23,6 +23,7 @@ import io.netty.buffer.DrillBuf;
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -30,6 +31,7 @@ import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.expr.holders.ComplexHolder;
 import org.apache.drill.exec.expr.holders.RepeatedListHolder;
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.AllocationManager.BufferLedger;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.proto.UserBitShared.SerializedField;
 import org.apache.drill.exec.record.MaterializedField;
@@ -415,6 +417,7 @@ public class RepeatedListVector extends AbstractContainerVector
   public void allocateNew(int valueCount, int innerValueCount) {
     clear();
     getOffsetVector().allocateNew(valueCount + 1);
+    getOffsetVector().getMutator().setSafe(0, 0);
     getMutator().reset();
   }
 
@@ -435,14 +438,13 @@ public class RepeatedListVector extends AbstractContainerVector
     copyFromSafe(fromIndex, toIndex, (RepeatedListVector) from);
   }
 
-  @Override
-  public int getAllocatedByteCount() {
-    return delegate.getAllocatedByteCount();
+  public void collectLedgers(Set<BufferLedger> ledgers) {
+    delegate.collectLedgers(ledgers);
   }
 
   @Override
-  public int getPayloadByteCount() {
-    return delegate.getPayloadByteCount();
+  public int getPayloadByteCount(int valueCount) {
+    return delegate.getPayloadByteCount(valueCount);
   }
 
   @Override
@@ -450,5 +452,4 @@ public class RepeatedListVector extends AbstractContainerVector
     // TODO: Figure out how to test this scenario, then what to do...
     throw new UnsupportedOperationException("Exchange() not yet supported for repeated lists");
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
index 7ff36a7..be73fc8 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
@@ -22,6 +22,7 @@ import io.netty.buffer.DrillBuf;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.drill.common.types.TypeProtos.DataMode;
@@ -33,6 +34,7 @@ import org.apache.drill.exec.expr.BasicTypeHelper;
 import org.apache.drill.exec.expr.holders.ComplexHolder;
 import org.apache.drill.exec.expr.holders.RepeatedMapHolder;
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.AllocationManager.BufferLedger;
 import org.apache.drill.exec.proto.UserBitShared.SerializedField;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.TransferPair;
@@ -100,7 +102,7 @@ public class RepeatedMapVector extends AbstractMapVector
   public void allocateNew(int groupCount, int innerValueCount) {
     clear();
     try {
-      offsets.allocateNew(groupCount + 1);
+      allocateOffsetsNew(groupCount);
       for (ValueVector v : getChildren()) {
         AllocationHelper.allocatePrecomputedChildCount(v, groupCount, 50, innerValueCount);
       }
@@ -108,10 +110,14 @@ public class RepeatedMapVector extends AbstractMapVector
       clear();
       throw e;
     }
-    offsets.zeroVector();
     mutator.reset();
   }
 
+  public void allocateOffsetsNew(int groupCount) {
+    offsets.allocateNew(groupCount + 1);
+    offsets.zeroVector();
+  }
+
   public Iterator<String> fieldNameIterator() {
     return getChildFieldNames().iterator();
   }
@@ -128,11 +134,7 @@ public class RepeatedMapVector extends AbstractMapVector
     if (getAccessor().getValueCount() == 0) {
       return 0;
     }
-    long bufferSize = offsets.getBufferSize();
-    for (final ValueVector v : (Iterable<ValueVector>) this) {
-      bufferSize += v.getBufferSize();
-    }
-    return (int) bufferSize;
+    return offsets.getBufferSize() + super.getBufferSize();
   }
 
   @Override
@@ -141,7 +143,7 @@ public class RepeatedMapVector extends AbstractMapVector
       return 0;
     }
 
-    long bufferSize = 0;
+    long bufferSize = offsets.getBufferSizeFor(valueCount);
     for (final ValueVector v : (Iterable<ValueVector>) this) {
       bufferSize += v.getBufferSizeFor(valueCount);
     }
@@ -451,7 +453,7 @@ public class RepeatedMapVector extends AbstractMapVector
       bufOffset += vectorLength;
     }
 
-    assert bufOffset == buffer.capacity();
+    assert bufOffset == buffer.writerIndex();
   }
 
 
@@ -598,7 +600,8 @@ public class RepeatedMapVector extends AbstractMapVector
   }
 
   @Override
-  public int getAllocatedByteCount() {
-    return super.getAllocatedByteCount( ) + offsets.getAllocatedByteCount();
+  public void collectLedgers(Set<BufferLedger> ledgers) {
+    super.collectLedgers(ledgers);
+    offsets.collectLedgers(ledgers);
   }
 }


Mime
View raw message