parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alexleven...@apache.org
Subject [1/2] incubator-parquet-mr git commit: PARQUET-160: avoid wasting 64K per empty buffer.
Date Thu, 05 Mar 2015 01:27:02 GMT
Repository: incubator-parquet-mr
Updated Branches:
  refs/heads/master fa8957d79 -> d084ad29e


http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/d084ad29/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java
b/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java
index 580bb34..d307471 100644
--- a/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java
+++ b/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java
@@ -18,6 +18,12 @@
  */
 package parquet.bytes;
 
+import static java.lang.Math.max;
+import static java.lang.Math.pow;
+import static java.lang.String.format;
+import static java.lang.System.arraycopy;
+import static parquet.Preconditions.checkArgument;
+
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -25,113 +31,155 @@ import java.util.ArrayList;
 import java.util.List;
 
 import parquet.Log;
-import parquet.Preconditions;
 
 /**
- * functionality of ByteArrayOutputStream without the memory and copy overhead
+ * Similar to a {@link ByteArrayOutputStream}, but uses a different strategy for growing
that does not involve copying.
+ * Where ByteArrayOutputStream is backed by a single array that "grows" by copying into a
new larger array, this output
+ * stream grows by allocating a new array (slab) and adding it to a list of previous arrays.
  *
- * It will linearly create a new slab of the initial size when needed (instead of creating
a new buffer and copying the data).
- * After 10 slabs their size will increase exponentially (similar to {@link ByteArrayOutputStream}
behavior) by making the new slab size the size of the existing data.
+ * Each new slab is allocated to be the same size as all the previous slabs combined, so
these allocations become
+ * exponentially less frequent, just like ByteArrayOutputStream, with one difference. This
output stream accepts a
+ * max capacity hint, which is a hint describing the max amount of data that will be written
to this stream. As the
+ * total size of this stream nears this max, this stream starts to grow linearly instead
of exponentially.
+ * So new slabs are allocated to be 1/5th of the max capacity hint,
+ * instead of equal to the total previous size of all slabs. This is useful because it prevents
allocating roughly
+ * twice the needed space when a new slab is added just before the stream is done being used.
  *
- * When reusing a buffer it will adjust the slab size based on the previous data size ({@link
CapacityByteArrayOutputStream#reset()})
+ * When reusing this stream it will adjust the initial slab size based on the previous data
size, aiming for fewer
+ * allocations, with the assumption that a similar amount of data will be written to this
stream on re-use.
+ * See ({@link CapacityByteArrayOutputStream#reset()}).
  *
  * @author Julien Le Dem
  *
  */
 public class CapacityByteArrayOutputStream extends OutputStream {
   private static final Log LOG = Log.getLog(CapacityByteArrayOutputStream.class);
+  private static final byte[] EMPTY_SLAB = new byte[0];
 
-  private static final int MINIMUM_SLAB_SIZE = 64 * 1024;
-  private static final int EXPONENTIAL_SLAB_SIZE_THRESHOLD = 10;
+  private int initialSlabSize;
+  private final int maxCapacityHint;
+  private final List<byte[]> slabs = new ArrayList<byte[]>();
 
-  private int slabSize;
-  private List<byte[]> slabs = new ArrayList<byte[]>();
   private byte[] currentSlab;
-  private int capacity;
   private int currentSlabIndex;
-  private int currentSlabPosition;
-  private int size;
+  private int bytesAllocated = 0;
+  private int bytesUsed = 0;
 
   /**
-   * @param initialSize the initialSize of the buffer (also slab size)
+   * Return an initial slab size such that a CapacityByteArrayOutputStream constructed with
it
+   * will end up allocating targetNumSlabs in order to reach targetCapacity. This aims to
be
+   * a balance between the overhead of creating new slabs and wasting memory by eagerly making
+   * initial slabs too big.
+   *
+   * Note that targetCapacity here need not match maxCapacityHint in the constructor of
+   * CapacityByteArrayOutputStream, though often that would make sense.
+   *
+   * @param minSlabSize no matter what we shouldn't make slabs any smaller than this
+   * @param targetCapacity after we've allocated targetNumSlabs how much capacity should
we have?
+   * @param targetNumSlabs how many slabs should it take to reach targetCapacity?
    */
-  public CapacityByteArrayOutputStream(int initialSize) {
-    Preconditions.checkArgument(initialSize > 0, "initialSize must be > 0");
-    initSlabs(initialSize);
+  public static int initialSlabSizeHeuristic(int minSlabSize, int targetCapacity, int targetNumSlabs)
{
+    // initialSlabSize = (targetCapacity / (2^targetNumSlabs)) means we double targetNumSlabs
times
+    // before reaching the targetCapacity
+    // eg for page size of 1MB we start at 1024 bytes.
+    // we also don't want to start too small, so we also apply a minimum.
+    return max(minSlabSize, ((int) (targetCapacity / pow(2, targetNumSlabs))));
   }
 
-  private void initSlabs(int initialSize) {
-    if (Log.DEBUG) LOG.debug(String.format("initial slab of size %d", initialSize));
-    this.slabSize = initialSize;
-    this.slabs.clear();
-    this.capacity = initialSize;
-    this.currentSlab = new byte[slabSize];
-    this.slabs.add(currentSlab);
-    this.currentSlabIndex = 0;
-    this.currentSlabPosition = 0;
-    this.size = 0;
+  /**
+   * Construct a CapacityByteArrayOutputStream configured such that its initial slab size
is
+   * determined by {@link #initialSlabSizeHeuristic}, with targetCapacity == maxCapacityHint
+   */
+  public static CapacityByteArrayOutputStream withTargetNumSlabs(
+      int minSlabSize, int maxCapacityHint, int targetNumSlabs) {
+
+    return new CapacityByteArrayOutputStream(
+        initialSlabSizeHeuristic(minSlabSize, maxCapacityHint, targetNumSlabs),
+        maxCapacityHint);
+  }
+
+  /**
+   * Defaults maxCapacityHint to 1MB
+   * @param initialSlabSize
+   * @deprecated use {@link CapacityByteArrayOutputStream#CapacityByteArrayOutputStream(int,
int)}
+   */
+  @Deprecated
+  public CapacityByteArrayOutputStream(int initialSlabSize) {
+    this(initialSlabSize, 1024 * 1024);
+  }
+
+  /**
+   * @param initialSlabSize the size to make the first slab
+   * @param maxCapacityHint a hint (not guarantee) of the max amount of data written to this
stream
+   */
+  public CapacityByteArrayOutputStream(int initialSlabSize, int maxCapacityHint) {
+    checkArgument(initialSlabSize > 0, "initialSlabSize must be > 0");
+    checkArgument(maxCapacityHint > 0, "maxCapacityHint must be > 0");
+    checkArgument(maxCapacityHint >= initialSlabSize, String.format("maxCapacityHint can't
be less than initialSlabSize %d %d", initialSlabSize, maxCapacityHint));
+    this.initialSlabSize = initialSlabSize;
+    this.maxCapacityHint = maxCapacityHint;
+    reset();
   }
 
+  /**
+   * the new slab is guaranteed to be at least minimumSize
+   * @param minimumSize the size of the data we want to copy in the new slab
+   */
   private void addSlab(int minimumSize) {
-    this.currentSlabIndex += 1;
-    if (currentSlabIndex < this.slabs.size()) {
-      // reuse existing slab
-      this.currentSlab = this.slabs.get(currentSlabIndex);
-      if (Log.DEBUG) LOG.debug(String.format("reusing slab of size %d", currentSlab.length));
-      if (currentSlab.length < minimumSize) {
-        if (Log.DEBUG) LOG.debug(String.format("slab size %,d too small for value of size
%,d. replacing slab", currentSlab.length, minimumSize));
-        byte[] newSlab = new byte[minimumSize];
-        capacity += minimumSize - currentSlab.length;
-        this.currentSlab = newSlab;
-        this.slabs.set(currentSlabIndex, newSlab);
-      }
+    int nextSlabSize;
+
+    if (bytesUsed == 0) {
+      nextSlabSize = initialSlabSize;
+    } else if (bytesUsed > maxCapacityHint / 5) {
+      // to avoid an overhead of up to twice the needed size, we get linear when approaching
target page size
+      nextSlabSize = maxCapacityHint / 5;
     } else {
-      if (currentSlabIndex > EXPONENTIAL_SLAB_SIZE_THRESHOLD) {
-        // make slabs bigger in case we are creating too many of them
-        // double slab size every time.
-        this.slabSize = size;
-        if (Log.DEBUG) LOG.debug(String.format("used %d slabs, new slab size %d", currentSlabIndex,
slabSize));
-      }
-      if (slabSize < minimumSize) {
-        if (Log.DEBUG) LOG.debug(String.format("slab size %,d too small for value of size
%,d. Bumping up slab size", slabSize, minimumSize));
-        this.slabSize = minimumSize;
-      }
-      if (Log.DEBUG) LOG.debug(String.format("new slab of size %d", slabSize));
-      this.currentSlab = new byte[slabSize];
-      this.slabs.add(currentSlab);
-      this.capacity += slabSize;
+      // double the size every time
+      nextSlabSize = bytesUsed;
+    }
+
+    if (nextSlabSize < minimumSize) {
+      if (Log.DEBUG) LOG.debug(format("slab size %,d too small for value of size %,d. Bumping
up slab size", nextSlabSize, minimumSize));
+      nextSlabSize = minimumSize;
     }
-    this.currentSlabPosition = 0;
+
+    if (Log.DEBUG) LOG.debug(format("used %d slabs, adding new slab of size %d", slabs.size(),
nextSlabSize));
+
+    this.currentSlab = new byte[nextSlabSize];
+    this.slabs.add(currentSlab);
+    this.bytesAllocated += nextSlabSize;
+    this.currentSlabIndex = 0;
   }
 
   @Override
   public void write(int b) {
-    if (currentSlabPosition == currentSlab.length) {
+    if (currentSlabIndex == currentSlab.length) {
       addSlab(1);
     }
-    currentSlab[currentSlabPosition] = (byte) b;
-    currentSlabPosition += 1;
-    size += 1;
+    currentSlab[currentSlabIndex] = (byte) b;
+    currentSlabIndex += 1;
+    bytesUsed += 1;
   }
 
   @Override
   public void write(byte b[], int off, int len) {
     if ((off < 0) || (off > b.length) || (len < 0) ||
         ((off + len) - b.length > 0)) {
-      throw new IndexOutOfBoundsException();
+      throw new IndexOutOfBoundsException(
+          String.format("Given byte array of size %d, with requested length(%d) and offset(%d)",
b.length, len, off));
     }
-    if (currentSlabPosition + len >= currentSlab.length) {
-      final int length1 = currentSlab.length - currentSlabPosition;
-      System.arraycopy(b, off, currentSlab, currentSlabPosition, length1);
+    if (currentSlabIndex + len >= currentSlab.length) {
+      final int length1 = currentSlab.length - currentSlabIndex;
+      arraycopy(b, off, currentSlab, currentSlabIndex, length1);
       final int length2 = len - length1;
       addSlab(length2);
-      System.arraycopy(b, off + length1, currentSlab, currentSlabPosition, length2);
-      currentSlabPosition = length2;
+      arraycopy(b, off + length1, currentSlab, currentSlabIndex, length2);
+      currentSlabIndex = length2;
     } else {
-      System.arraycopy(b, off, currentSlab, currentSlabPosition, len);
-      currentSlabPosition += len;
+      arraycopy(b, off, currentSlab, currentSlabIndex, len);
+      currentSlabIndex += len;
     }
-    size += len;
+    bytesUsed += len;
   }
 
   /**
@@ -142,71 +190,52 @@ public class CapacityByteArrayOutputStream extends OutputStream {
    * @exception  IOException  if an I/O error occurs.
    */
   public void writeTo(OutputStream out) throws IOException {
-    for (int i = 0; i < currentSlabIndex; i++) {
+    for (int i = 0; i < slabs.size() - 1; i++) {
       final byte[] slab = slabs.get(i);
-      out.write(slab, 0, slab.length);
+      out.write(slab);
     }
-    out.write(currentSlab, 0, currentSlabPosition);
+    out.write(currentSlab, 0, currentSlabIndex);
   }
 
   /**
-   * @return the size of the allocated buffer
+   * @return The total size in bytes of data written to this stream.
+   */
+  public long size() {
+    return bytesUsed;
+  }
+
+  /**
+   *
+   * @return The total size in bytes currently allocated for this stream.
    */
   public int getCapacity() {
-    return capacity;
+    return bytesAllocated;
   }
 
   /**
    * When re-using an instance with reset, it will adjust slab size based on previous data
size.
    * The intent is to reuse the same instance for the same type of data (for example, the
same column).
-   * The assumption is that the size in the buffer will be consistent. Otherwise we fall
back to exponentialy double the slab size.
-   * <ul>
-   * <li>if we used less than half of the first slab (and it is above the minimum slab
size), we will make the slab size smaller.
-   * <li>if we used more than the slab count threshold (10), we will re-adjust the
slab size.
-   * </ul>
-   * if re-adjusting the slab size we will make it 1/5th of the previous used size in the
buffer so that overhead of extra memory allocation is about 20%
-   * If we used less than the available slabs we free up the unused ones to reduce memory
overhead.
+   * The assumption is that the size in the buffer will be consistent.
    */
   public void reset() {
-    // heuristics to adjust slab size
-    if (
-        // if we have only one slab, make sure it is not way too big (more than twice what
we need). Except if the slab is already small
-        (currentSlabIndex == 0 && currentSlabPosition < currentSlab.length / 2
&& currentSlab.length > MINIMUM_SLAB_SIZE)
-        ||
-        // we want to avoid generating too many slabs.
-        (currentSlabIndex > EXPONENTIAL_SLAB_SIZE_THRESHOLD)
-        ){
-      // readjust slab size
-      initSlabs(Math.max(size / 5, MINIMUM_SLAB_SIZE)); // should make overhead to about
20% without incurring many slabs
-      if (Log.DEBUG) LOG.debug(String.format("used %d slabs, new slab size %d", currentSlabIndex
+ 1, slabSize));
-    } else if (currentSlabIndex < slabs.size() - 1) {
-      // free up the slabs that we are not using. We want to minimize overhead
-      this.slabs = new ArrayList<byte[]>(slabs.subList(0, currentSlabIndex + 1));
-      this.capacity = 0;
-      for (byte[] slab : slabs) {
-        capacity += slab.length;
-      }
-    }
+    // readjust slab size.
+    // 7 = 2^3 - 1 so that doubling the initial size 3 times will get to the same size
+    this.initialSlabSize = max(bytesUsed / 7, initialSlabSize);
+    if (Log.DEBUG) LOG.debug(String.format("initial slab of size %d", initialSlabSize));
+    this.slabs.clear();
+    this.bytesAllocated = 0;
+    this.bytesUsed = 0;
+    this.currentSlab = EMPTY_SLAB;
     this.currentSlabIndex = 0;
-    this.currentSlabPosition = 0;
-    this.currentSlab = slabs.get(currentSlabIndex);
-    this.size = 0;
-  }
-
-  /**
-   * @return the size of the buffered data
-   */
-  public long size() {
-    return size;
   }
 
   /**
-   * @return the index of the last value being written to this stream, which
+   * @return the index of the last value written to this stream, which
    * can be passed to {@link #setByte(long, byte)} in order to change it
    */
   public long getCurrentIndex() {
-    Preconditions.checkArgument(size > 0, "This is an empty stream");
-    return size - 1;
+    checkArgument(bytesUsed > 0, "This is an empty stream");
+    return bytesUsed - 1;
   }
 
   /**
@@ -216,11 +245,10 @@ public class CapacityByteArrayOutputStream extends OutputStream {
    * @param value the value to replace it with
    */
   public void setByte(long index, byte value) {
-    Preconditions.checkArgument(index < size,
-      "Index: " + index + " is >= the current size of: " + size);
+    checkArgument(index < bytesUsed, "Index: " + index + " is >= the current size of:
" + bytesUsed);
 
     long seen = 0;
-    for (int i = 0; i <=currentSlabIndex; i++) {
+    for (int i = 0; i < slabs.size(); i++) {
       byte[] slab = slabs.get(i);
       if (index < seen + slab.length) {
         // ok found index
@@ -236,11 +264,11 @@ public class CapacityByteArrayOutputStream extends OutputStream {
    * @return a text representation of the memory usage of this structure
    */
   public String memUsageString(String prefix) {
-    return String.format("%s %s %d slabs, %,d bytes", prefix, getClass().getSimpleName(),
slabs.size(), getCapacity());
+    return format("%s %s %d slabs, %,d bytes", prefix, getClass().getSimpleName(), slabs.size(),
getCapacity());
   }
 
   /**
-   * @return the total count of allocated slabs
+   * @return the total number of allocated slabs
    */
   int getSlabCount() {
     return slabs.size();

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/d084ad29/parquet-encoding/src/main/java/parquet/bytes/ConcatenatingByteArrayCollector.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/main/java/parquet/bytes/ConcatenatingByteArrayCollector.java
b/parquet-encoding/src/main/java/parquet/bytes/ConcatenatingByteArrayCollector.java
new file mode 100644
index 0000000..6a8437b
--- /dev/null
+++ b/parquet-encoding/src/main/java/parquet/bytes/ConcatenatingByteArrayCollector.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package parquet.bytes;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.lang.String.format;
+
+public class ConcatenatingByteArrayCollector extends BytesInput {
+  private final List<byte[]> slabs = new ArrayList<byte[]>();
+  private long size = 0;
+
+  public void collect(BytesInput bytesInput) throws IOException {
+    byte[] bytes = bytesInput.toByteArray();
+    slabs.add(bytes);
+    size += bytes.length;
+  }
+
+  public void reset() {
+    size = 0;
+    slabs.clear();
+  }
+
+  @Override
+  public void writeAllTo(OutputStream out) throws IOException {
+    for (byte[] slab : slabs) {
+      out.write(slab);
+    }
+  }
+
+  @Override
+  public long size() {
+    return size;
+  }
+
+  /**
+   * @param prefix  a prefix to be used for every new line in the string
+   * @return a text representation of the memory usage of this structure
+   */
+  public String memUsageString(String prefix) {
+    return format("%s %s %d slabs, %,d bytes", prefix, getClass().getSimpleName(), slabs.size(),
size);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/d084ad29/parquet-encoding/src/test/java/parquet/bytes/TestCapacityByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/test/java/parquet/bytes/TestCapacityByteArrayOutputStream.java
b/parquet-encoding/src/test/java/parquet/bytes/TestCapacityByteArrayOutputStream.java
index 72b52f5..41006f1 100644
--- a/parquet-encoding/src/test/java/parquet/bytes/TestCapacityByteArrayOutputStream.java
+++ b/parquet-encoding/src/test/java/parquet/bytes/TestCapacityByteArrayOutputStream.java
@@ -32,7 +32,7 @@ public class TestCapacityByteArrayOutputStream {
 
   @Test
   public void testWrite() throws Throwable {
-    CapacityByteArrayOutputStream capacityByteArrayOutputStream = new CapacityByteArrayOutputStream(10);
+    CapacityByteArrayOutputStream capacityByteArrayOutputStream = newCapacityBAOS(10);
     final int expectedSize = 54;
     for (int i = 0; i < expectedSize; i++) {
       capacityByteArrayOutputStream.write(i);
@@ -43,7 +43,7 @@ public class TestCapacityByteArrayOutputStream {
 
   @Test
   public void testWriteArray() throws Throwable {
-    CapacityByteArrayOutputStream capacityByteArrayOutputStream = new CapacityByteArrayOutputStream(10);
+    CapacityByteArrayOutputStream capacityByteArrayOutputStream = newCapacityBAOS(10);
     int v = 23;
     writeArraysOf3(capacityByteArrayOutputStream, v);
     validate(capacityByteArrayOutputStream, v * 3);
@@ -51,7 +51,7 @@ public class TestCapacityByteArrayOutputStream {
 
   @Test
   public void testWriteArrayAndInt() throws Throwable {
-    CapacityByteArrayOutputStream capacityByteArrayOutputStream = new CapacityByteArrayOutputStream(10);
+    CapacityByteArrayOutputStream capacityByteArrayOutputStream = newCapacityBAOS(10);
     for (int i = 0; i < 23; i++) {
       byte[] toWrite = { (byte)(i * 3), (byte)(i * 3 + 1)};
       capacityByteArrayOutputStream.write(toWrite);
@@ -62,9 +62,13 @@ public class TestCapacityByteArrayOutputStream {
 
   }
 
+  protected CapacityByteArrayOutputStream newCapacityBAOS(int initialSize) {
+    return new CapacityByteArrayOutputStream(10, 1000000);
+  }
+
   @Test
   public void testReset() throws Throwable {
-    CapacityByteArrayOutputStream capacityByteArrayOutputStream = new CapacityByteArrayOutputStream(10);
+    CapacityByteArrayOutputStream capacityByteArrayOutputStream = newCapacityBAOS(10);
     for (int i = 0; i < 54; i++) {
       capacityByteArrayOutputStream.write(i);
       assertEquals(i + 1, capacityByteArrayOutputStream.size());
@@ -83,7 +87,7 @@ public class TestCapacityByteArrayOutputStream {
 
   @Test
   public void testWriteArrayBiggerThanSlab() throws Throwable {
-    CapacityByteArrayOutputStream capacityByteArrayOutputStream = new CapacityByteArrayOutputStream(10);
+    CapacityByteArrayOutputStream capacityByteArrayOutputStream = newCapacityBAOS(10);
     int v = 23;
     writeArraysOf3(capacityByteArrayOutputStream, v);
     int n = v * 3;
@@ -109,7 +113,7 @@ public class TestCapacityByteArrayOutputStream {
 
   @Test
   public void testWriteArrayManySlabs() throws Throwable {
-    CapacityByteArrayOutputStream capacityByteArrayOutputStream = new CapacityByteArrayOutputStream(10);
+    CapacityByteArrayOutputStream capacityByteArrayOutputStream = newCapacityBAOS(10);
     int it = 500;
     int v = 23;
     for (int j = 0; j < it; j++) {
@@ -137,7 +141,7 @@ public class TestCapacityByteArrayOutputStream {
   public void testReplaceByte() throws Throwable {
     // test replace the first value
     {
-      CapacityByteArrayOutputStream cbaos = new CapacityByteArrayOutputStream(5);
+      CapacityByteArrayOutputStream cbaos = newCapacityBAOS(5);
       cbaos.write(10);
       assertEquals(0, cbaos.getCurrentIndex());
       cbaos.setByte(0, (byte) 7);
@@ -148,7 +152,7 @@ public class TestCapacityByteArrayOutputStream {
 
     // test replace value in the first slab
     {
-      CapacityByteArrayOutputStream cbaos = new CapacityByteArrayOutputStream(5);
+      CapacityByteArrayOutputStream cbaos = newCapacityBAOS(5);
       cbaos.write(10);
       cbaos.write(13);
       cbaos.write(15);
@@ -163,7 +167,7 @@ public class TestCapacityByteArrayOutputStream {
 
     // test replace in *not* the first slab
     {
-      CapacityByteArrayOutputStream cbaos = new CapacityByteArrayOutputStream(5);
+      CapacityByteArrayOutputStream cbaos = newCapacityBAOS(5);
 
       // advance part way through the 3rd slab
       for (int i = 0; i < 12; i++) {
@@ -181,7 +185,7 @@ public class TestCapacityByteArrayOutputStream {
 
     // test replace last value of a slab
     {
-      CapacityByteArrayOutputStream cbaos = new CapacityByteArrayOutputStream(5);
+      CapacityByteArrayOutputStream cbaos = newCapacityBAOS(5);
 
       // advance part way through the 3rd slab
       for (int i = 0; i < 12; i++) {
@@ -199,7 +203,7 @@ public class TestCapacityByteArrayOutputStream {
 
     // test replace last value
     {
-      CapacityByteArrayOutputStream cbaos = new CapacityByteArrayOutputStream(5);
+      CapacityByteArrayOutputStream cbaos = newCapacityBAOS(5);
 
       // advance part way through the 3rd slab
       for (int i = 0; i < 12; i++) {

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/d084ad29/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java
index b3cdd65..e3bab0d 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,7 +19,9 @@
 package parquet.hadoop;
 
 import static parquet.Log.INFO;
+import static parquet.column.statistics.Statistics.getStatsBasedOnType;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -29,14 +31,13 @@ import java.util.Set;
 
 import parquet.Log;
 import parquet.bytes.BytesInput;
-import parquet.bytes.CapacityByteArrayOutputStream;
+import parquet.bytes.ConcatenatingByteArrayCollector;
 import parquet.column.ColumnDescriptor;
 import parquet.column.Encoding;
 import parquet.column.page.DictionaryPage;
 import parquet.column.page.PageWriteStore;
 import parquet.column.page.PageWriter;
 import parquet.column.statistics.Statistics;
-import parquet.format.ColumnChunk;
 import parquet.format.converter.ParquetMetadataConverter;
 import parquet.hadoop.CodecFactory.BytesCompressor;
 import parquet.io.ParquetEncodingException;
@@ -52,7 +53,8 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
     private final ColumnDescriptor path;
     private final BytesCompressor compressor;
 
-    private final CapacityByteArrayOutputStream buf;
+    private final ByteArrayOutputStream tempOutputStream = new ByteArrayOutputStream();
+    private final ConcatenatingByteArrayCollector buf;
     private DictionaryPage dictionaryPage;
 
     private long uncompressedLength;
@@ -64,11 +66,11 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
 
     private Statistics totalStatistics;
 
-    private ColumnChunkPageWriter(ColumnDescriptor path, BytesCompressor compressor, int
initialSize) {
+    private ColumnChunkPageWriter(ColumnDescriptor path, BytesCompressor compressor, int
pageSize) {
       this.path = path;
       this.compressor = compressor;
-      this.buf = new CapacityByteArrayOutputStream(initialSize);
-      this.totalStatistics = Statistics.getStatsBasedOnType(this.path.getType());
+      this.buf = new ConcatenatingByteArrayCollector();
+      this.totalStatistics = getStatsBasedOnType(this.path.getType());
     }
 
     @Override
@@ -91,6 +93,7 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
             "Cannot write compressed page larger than Integer.MAX_VALUE bytes: "
             + compressedSize);
       }
+      tempOutputStream.reset();
       parquetMetadataConverter.writeDataPageHeader(
           (int)uncompressedSize,
           (int)compressedSize,
@@ -99,13 +102,15 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
           rlEncoding,
           dlEncoding,
           valuesEncoding,
-          buf);
+          tempOutputStream);
       this.uncompressedLength += uncompressedSize;
       this.compressedLength += compressedSize;
       this.totalValueCount += valueCount;
       this.pageCount += 1;
       this.totalStatistics.mergeStatistics(statistics);
-      compressedBytes.writeAllTo(buf);
+      // by concatenating before collecting instead of collecting twice,
+      // we only allocate one buffer to copy into instead of multiple.
+      buf.collect(BytesInput.concat(BytesInput.from(tempOutputStream), compressedBytes));
       encodings.add(rlEncoding);
       encodings.add(dlEncoding);
       encodings.add(valuesEncoding);
@@ -127,21 +132,30 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
       int compressedSize = toIntWithCheck(
           compressedData.size() + repetitionLevels.size() + definitionLevels.size()
       );
+      tempOutputStream.reset();
       parquetMetadataConverter.writeDataPageV2Header(
           uncompressedSize, compressedSize,
           valueCount, nullCount, rowCount,
           statistics,
           dataEncoding,
-          rlByteLength, dlByteLength,
-          buf);
+          rlByteLength,
+          dlByteLength,
+          tempOutputStream);
       this.uncompressedLength += uncompressedSize;
       this.compressedLength += compressedSize;
       this.totalValueCount += valueCount;
       this.pageCount += 1;
       this.totalStatistics.mergeStatistics(statistics);
-      repetitionLevels.writeAllTo(buf);
-      definitionLevels.writeAllTo(buf);
-      compressedData.writeAllTo(buf);
+
+      // by concatenating before collecting instead of collecting twice,
+      // we only allocate one buffer to copy into instead of multiple.
+      buf.collect(
+          BytesInput.concat(
+            BytesInput.from(tempOutputStream),
+            repetitionLevels,
+            definitionLevels,
+            compressedData)
+      );
       encodings.add(dataEncoding);
     }
 
@@ -165,7 +179,7 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
         writer.writeDictionaryPage(dictionaryPage);
         encodings.add(dictionaryPage.getEncoding());
       }
-      writer.writeDataPages(BytesInput.from(buf), uncompressedLength, compressedLength, totalStatistics,
new ArrayList<Encoding>(encodings));
+      writer.writeDataPages(buf, uncompressedLength, compressedLength, totalStatistics, new
ArrayList<Encoding>(encodings));
       writer.endColumn();
       if (INFO) {
         LOG.info(
@@ -183,7 +197,7 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
 
     @Override
     public long allocatedSize() {
-      return buf.getCapacity();
+      return buf.size();
     }
 
     @Override
@@ -206,10 +220,10 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
   private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<ColumnDescriptor,
ColumnChunkPageWriter>();
   private final MessageType schema;
 
-  public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int initialSize)
{
+  public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int pageSize)
{
     this.schema = schema;
     for (ColumnDescriptor path : schema.getColumns()) {
-      writers.put(path,  new ColumnChunkPageWriter(path, compressor, initialSize));
+      writers.put(path,  new ColumnChunkPageWriter(path, compressor, pageSize));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/d084ad29/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
index 3e7e0e5..144515c 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
@@ -45,7 +45,6 @@ import parquet.schema.MessageType;
 class InternalParquetRecordWriter<T> {
   private static final Log LOG = Log.getLog(InternalParquetRecordWriter.class);
 
-  private static final int MINIMUM_BUFFER_SIZE = 64 * 1024;
   private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
   private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
 
@@ -101,22 +100,11 @@ class InternalParquetRecordWriter<T> {
   }
 
   private void initStore() {
-    // we don't want this number to be too small
-    // ideally we divide the block equally across the columns
-    // it is unlikely all columns are going to be the same size.
-    // its value is likely below Integer.MAX_VALUE (2GB), although rowGroupSize is a long
type.
-    // therefore this size is cast to int, since allocating byte array in under layer needs
to
-    // limit the array size in an int scope.
-    int initialBlockBufferSize = Ints.checkedCast(max(MINIMUM_BUFFER_SIZE, rowGroupSize /
schema.getColumns().size() / 5));
-    pageStore = new ColumnChunkPageWriteStore(compressor, schema, initialBlockBufferSize);
-    // we don't want this number to be too small either
-    // ideally, slightly bigger than the page size, but not bigger than the block buffer
-    int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize));
+    pageStore = new ColumnChunkPageWriteStore(compressor, schema, pageSize);
     columnStore = parquetProperties.newColumnWriteStore(
         schema,
         pageStore,
-        pageSize,
-        initialPageBufferSize);
+        pageSize);
     MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema);
     writeSupport.prepareForWrite(columnIO.getRecordWriter(columnStore));
   }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/d084ad29/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java
b/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java
index b1ec02e..28f6be2 100644
--- a/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,7 +21,6 @@ package parquet.hadoop;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.inOrder;
-import static org.mockito.Mockito.verify;
 import static parquet.column.Encoding.PLAIN;
 import static parquet.column.Encoding.RLE;
 import static parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
@@ -44,11 +43,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.junit.Before;
 import org.junit.Test;
-
 import org.mockito.InOrder;
 import org.mockito.Mockito;
-import org.mockito.internal.verification.VerificationModeFactory;
-import org.mockito.verification.VerificationMode;
+
 import parquet.bytes.BytesInput;
 import parquet.bytes.LittleEndianDataInputStream;
 import parquet.column.ColumnDescriptor;
@@ -63,8 +60,6 @@ import parquet.hadoop.metadata.CompressionCodecName;
 import parquet.hadoop.metadata.ParquetMetadata;
 import parquet.schema.MessageType;
 import parquet.schema.MessageTypeParser;
-import parquet.schema.OriginalType;
-import parquet.schema.PrimitiveType;
 import parquet.schema.Types;
 
 public class TestColumnChunkPageWriteStore {

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/d084ad29/parquet-pig/src/test/java/parquet/pig/TupleConsumerPerfTest.java
----------------------------------------------------------------------
diff --git a/parquet-pig/src/test/java/parquet/pig/TupleConsumerPerfTest.java b/parquet-pig/src/test/java/parquet/pig/TupleConsumerPerfTest.java
index fc679b5..a33790b 100644
--- a/parquet-pig/src/test/java/parquet/pig/TupleConsumerPerfTest.java
+++ b/parquet-pig/src/test/java/parquet/pig/TupleConsumerPerfTest.java
@@ -59,7 +59,7 @@ public class TupleConsumerPerfTest {
     MessageType schema = new PigSchemaConverter().convert(Utils.getSchemaFromString(pigSchema));
 
     MemPageStore memPageStore = new MemPageStore(0);
-    ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 50*1024*1024, 50*1024*1024,
50*1024*1024, false, WriterVersion.PARQUET_1_0);
+    ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 50*1024*1024, 50*1024*1024,
false, WriterVersion.PARQUET_1_0);
     write(memPageStore, columns, schema, pigSchema);
     columns.flush();
     read(memPageStore, pigSchema, pigSchemaProjected, pigSchemaNoString);

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/d084ad29/parquet-thrift/src/test/java/parquet/thrift/TestParquetReadProtocol.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/test/java/parquet/thrift/TestParquetReadProtocol.java b/parquet-thrift/src/test/java/parquet/thrift/TestParquetReadProtocol.java
index 1083b0c..e5edb37 100644
--- a/parquet-thrift/src/test/java/parquet/thrift/TestParquetReadProtocol.java
+++ b/parquet-thrift/src/test/java/parquet/thrift/TestParquetReadProtocol.java
@@ -148,7 +148,7 @@ public class TestParquetReadProtocol {
     final MessageType schema = schemaConverter.convert(thriftClass);
     LOG.info(schema);
     final MessageColumnIO columnIO = new ColumnIOFactory(true).getColumnIO(schema);
-    final ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 10000, 10000,
10000, false, WriterVersion.PARQUET_1_0);
+    final ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 10000, 10000,
false, WriterVersion.PARQUET_1_0);
     final RecordConsumer recordWriter = columnIO.getRecordWriter(columns);
     final StructType thriftType = schemaConverter.toStructType(thriftClass);
     ParquetWriteProtocol parquetWriteProtocol = new ParquetWriteProtocol(recordWriter, columnIO,
thriftType);


Mime
View raw message