kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch 1.1 updated: KAFKA-6430: Add buffer for gzip streams (#4537)
Date Fri, 16 Feb 2018 23:05:13 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new f3b3d7c  KAFKA-6430: Add buffer for gzip streams (#4537)
f3b3d7c is described below

commit f3b3d7c1d98175e33cac7e4c1dc763d3f197ca64
Author: ying-zheng <zheng.ying@rocketmail.com>
AuthorDate: Fri Feb 16 15:03:32 2018 -0800

    KAFKA-6430: Add buffer for gzip streams (#4537)
    
    As described in the JIRA ticket, this can double throughput.
---
 .../org/apache/kafka/common/record/CompressionType.java    | 14 +++++++++++---
 1 file changed, 11 insertions(+), 3 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
index 16d6e01..9b3bfc4 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
@@ -20,6 +20,8 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.utils.ByteBufferInputStream;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
 
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.lang.invoke.MethodHandle;
@@ -49,8 +51,10 @@ public enum CompressionType {
         @Override
         public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion)
{
             try {
-                // GZIPOutputStream has a default buffer size of 512 bytes, which is too
small
-                return new GZIPOutputStream(buffer, 8 * 1024);
+                // Set input buffer (uncompressed) to 16 KB (none by default) and output
buffer (compressed) to
+                // 8 KB (0.5 KB by default) to ensure reasonable performance in cases where
the caller passes a small
+                // number of bytes to write (potentially a single byte)
+                return new BufferedOutputStream(new GZIPOutputStream(buffer, 8 * 1024), 16
* 1024);
             } catch (Exception e) {
                 throw new KafkaException(e);
             }
@@ -59,7 +63,11 @@ public enum CompressionType {
         @Override
         public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier
decompressionBufferSupplier) {
             try {
-                return new GZIPInputStream(new ByteBufferInputStream(buffer));
+                // Set output buffer (uncompressed) to 16 KB (none by default) and input
buffer (compressed) to
+                // 8 KB (0.5 KB by default) to ensure reasonable performance in cases where
the caller reads a small
+                // number of bytes (potentially a single byte)
+                return new BufferedInputStream(new GZIPInputStream(new ByteBufferInputStream(buffer),
8 * 1024),
+                        16 * 1024);
             } catch (Exception e) {
                 throw new KafkaException(e);
             }

-- 
To stop receiving notification emails like this one, please contact
ijuma@apache.org.

Mime
View raw message