parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [28/50] [abbrv] parquet-mr git commit: PARQUET-642: Improve performance of ByteBuffer based read / write paths
Date Thu, 19 Jan 2017 01:27:39 GMT
PARQUET-642: Improve performance of ByteBuffer based read / write paths

While trying out the newest Parquet version, we noticed that the changes to start using ByteBuffers:
https://github.com/apache/parquet-mr/commit/6b605a4ea05b66e1a6bf843353abcb4834a4ced8 and https://github.com/apache/parquet-mr/commit/6b24a1d1b5e2792a7821ad172a45e38d2b04f9b8
(mostly avro but a couple of ByteBuffer changes) caused our jobs to slow down a bit.

Read overhead: 4-6% (in MB_Millis)
Write overhead: 6-10% (MB_Millis).

Seems like this seems to be due to the encoding / decoding of Strings in the [Binary class](https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java):
[toStringUsingUTF8()](https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java#L388)
- for reads
[encodeUTF8()](https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java#L236)
- for writes

With these changes we see around 5% improvement in MB_Millis while running the job on our
Hadoop cluster.

Added some microbenchmark details to the jira.

Note that I've left the behavior the same for the avro write path - it still uses CharSequence
and the Charset based encoders.

Author: Piyush Narang <pnarang@twitter.com>

Closes #347 from piyushnarang/bytebuffer-encoding-fix-pr and squashes the following commits:

43c5bdd [Piyush Narang] Keep avro on char sequence
2d50c8c [Piyush Narang] Update Binary approach
9e58237 [Piyush Narang] Proof of concept fixes

Conflicts:
    parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
    parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
Resolution:
    Use String encoding/decoding where possible.
    Updated Avro to use fromCharSequence to avoid two copies


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/1535970c
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/1535970c
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/1535970c

Branch: refs/heads/parquet-1.8.x
Commit: 1535970c5072633a69413cdc070208d62c6e0431
Parents: 35cf1b4
Author: Piyush Narang <pnarang@twitter.com>
Authored: Thu Jun 30 09:50:59 2016 -0700
Committer: Ryan Blue <blue@apache.org>
Committed: Mon Jan 9 16:54:54 2017 -0800

----------------------------------------------------------------------
 .../apache/parquet/avro/AvroWriteSupport.java   |  2 +-
 .../java/org/apache/parquet/io/api/Binary.java  | 63 +++++++++++++++++---
 2 files changed, 56 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/1535970c/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
index c75bb03..29dc9a1 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
@@ -283,7 +283,7 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
       Utf8 utf8 = (Utf8) value;
       return Binary.fromReusedByteArray(utf8.getBytes(), 0, utf8.getByteLength());
     }
-    return Binary.fromString(value.toString());
+    return Binary.fromCharSequence((CharSequence) value);
   }
 
   private static GenericData getDataModel(Configuration conf) {

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/1535970c/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java b/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
index f88d740..a2f686d 100644
--- a/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
+++ b/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
@@ -25,8 +25,12 @@ import java.io.OutputStream;
 import java.io.Serializable;
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.CharsetEncoder;
 import java.util.Arrays;
 
+import org.apache.parquet.io.ParquetDecodingException;
 import org.apache.parquet.io.ParquetEncodingException;
 
 import static org.apache.parquet.bytes.BytesUtils.UTF8;
@@ -195,21 +199,49 @@ abstract public class Binary implements Comparable<Binary>, Serializable
{
 
   }
 
-  private static class FromStringBinary extends ByteArrayBackedBinary {
+  private static class FromStringBinary extends ByteBufferBackedBinary {
     public FromStringBinary(String value) {
-      // reused is false, because we do not
-      // hold on to the underlying bytes,
-      // and nobody else has a handle to them
+      // reused is false, because we do not hold on to the buffer after
+      // conversion, and nobody else has a handle to it
       super(encodeUTF8(value), false);
     }
 
-    private static byte[] encodeUTF8(String value) {
+    @Override
+    public String toString() {
+      return "Binary{\"" + toStringUsingUTF8() + "\"}";
+    }
+
+    private static ByteBuffer encodeUTF8(String value) {
       try {
-        return value.getBytes("UTF-8");
+        return ByteBuffer.wrap(value.getBytes("UTF-8"));
       } catch (UnsupportedEncodingException e) {
         throw new ParquetEncodingException("UTF-8 not supported.", e);
       }
     }
+  }
+
+  private static class FromCharSequenceBinary extends ByteBufferBackedBinary {
+    public FromCharSequenceBinary(CharSequence value) {
+      // reused is false, because we do not hold on to the buffer after
+      // conversion, and nobody else has a handle to it
+      super(encodeUTF8(value), false);
+    }
+
+    private static final ThreadLocal<CharsetEncoder> ENCODER =
+      new ThreadLocal<CharsetEncoder>() {
+        @Override
+        protected CharsetEncoder initialValue() {
+          return UTF8.newEncoder();
+        }
+      };
+
+    private static ByteBuffer encodeUTF8(CharSequence value) {
+      try {
+        return ENCODER.get().encode(CharBuffer.wrap(value));
+      } catch (CharacterCodingException e) {
+        throw new ParquetEncodingException("UTF-8 not supported.", e);
+      }
+    }
 
     @Override
     public String toString() {
@@ -340,7 +372,18 @@ abstract public class Binary implements Comparable<Binary>, Serializable
{
 
     @Override
     public String toStringUsingUTF8() {
-      return UTF8.decode(value).toString();
+      String ret;
+      if (value.hasArray()) {
+        try {
+          ret = new String(value.array(), value.arrayOffset() + value.position(), value.remaining(),
"UTF-8");
+        } catch (UnsupportedEncodingException e) {
+          throw new ParquetDecodingException("UTF-8 not supported");
+        }
+      } else {
+        ret = UTF8.decode(value.duplicate()).toString();
+      }
+
+      return ret;
     }
 
     @Override
@@ -472,10 +515,14 @@ abstract public class Binary implements Comparable<Binary>, Serializable
{
     return fromReusedByteBuffer(value); // Assume producer intends to reuse byte[]
   }
 
-  public static Binary fromString(final String value) {
+  public static Binary fromString(String value) {
     return new FromStringBinary(value);
   }
 
+  public static Binary fromCharSequence(CharSequence value) {
+    return new FromCharSequenceBinary(value);
+  }
+
   /**
    * @see {@link Arrays#hashCode(byte[])}
    * @param array


Mime
View raw message