kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Guozhang Wang" <guw...@linkedin.com>
Subject Re: Review Request 18299: Fix KAFKA-1253
Date Wed, 19 Mar 2014 00:10:25 GMT


> On March 18, 2014, 7:07 p.m., Jun Rao wrote:
> > 1. There are two unit test failures.
> > 
> > RecordAccumulatorTest. testFull
> > RecordAccumulatorTest. testLinger
> > 
> > 2. The issue with compressed message size exceeding the request size is still not
resolved.
> 
> Guozhang Wang wrote:
>     Hi Jun,
>     
>     This patch is not final yet, I am still doing some debugging for issues found in
the system test and perf tests. Could you take another look once I uploaded the final version?
This version is just for bookkeeping, sorry for the confusion.

These two tests do not fail any more in my latest patch, maybe related to some bugs I found
during system tests.

When a single batch is larger than the max request size, will drop it on the floor and log
the error.


> On March 18, 2014, 7:07 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/common/record/Compressor.java, lines 169-170
> > <https://reviews.apache.org/r/18299/diff/13/?file=525728#file525728line169>
> >
> >     If we want to be a bit conservative, shouldn't we take the max of the two estimates?

According to the perf test, the min is already quite effective in reducing reallocation possibilities;
on the other hand, more conservative means larger memory waste.


> On March 18, 2014, 7:07 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java, lines 78-79
> > <https://reviews.apache.org/r/18299/diff/13/?file=525729#file525729line78>
> >
> >     Why do we need to mark this position?

It is not ByteBuffer.mark, rather Compressor.mark() that tells the compressor that one record
with the size has been written. I will change the name to avoid confusion.


> On March 18, 2014, 7:07 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java, lines 172-173
> > <https://reviews.apache.org/r/18299/diff/13/?file=525729#file525729line172>
> >
> >     This comment is still not clear to me.

Reworded again.


> On March 18, 2014, 7:07 p.m., Jun Rao wrote:
> > core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala, lines 40-43
> > <https://reviews.apache.org/r/18299/diff/13/?file=525737#file525737line40>
> >
> >     Not sure how this test works. We send messages randomly to one of the four partitions,
yet we only read data from partition 0. Do we need to have 4 partitions in the topic? Do we
need more than 1 broker?

When the topic is created, only one partition is used. That is why this test works, but I
agree that we do not actually needs 4 partitions and 2 servers. Will change that.


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/18299/#review37585
-----------------------------------------------------------


On March 17, 2014, 10:56 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/18299/
> -----------------------------------------------------------
> 
> (Updated March 17, 2014, 10:56 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1253
>     https://issues.apache.org/jira/browse/KAFKA-1253
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> In-place compression with
> 
> 1) Dynamic reallocation in the underlying byte buffer
> 2) Written bytes estimate to reduce reallocation probabilities
> 3) Deallocation in buffer pool following the original capacity
> 
> 
> Diffs
> -----
> 
>   build.gradle 84fa0d6b5f7405af755c5d7ff7bdd7592bb8668f 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1ac69436f117800815b8d50f042e9e2a29364b43

>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 32e12ad149f6d70c96a498d0a390976f77bf9e2a

>   clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java 5bed60730ea108684bea2440af5a008feb0ada61

>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
7a03f389cc6a4d56152b882555d7498af9a04d20 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
038a05a94b795ec0a95b2d40a89222394b5a74c4 
>   clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java 3ebbb804242be6a001b3bae6524afccc85a87602

>   clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java PRE-CREATION

>   clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java PRE-CREATION

>   clients/src/main/java/org/apache/kafka/common/record/CompressionFactory.java PRE-CREATION

>   clients/src/main/java/org/apache/kafka/common/record/CompressionType.java 906da02d02c03aadd8ab73ed2fc9a1898acb8d72

>   clients/src/main/java/org/apache/kafka/common/record/Compressor.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 9d8935fa3beeb2a78b109a41ed76fd4374239560

>   clients/src/main/java/org/apache/kafka/common/record/Record.java f1dc9778502cbdfe982254fb6e25947842622239

>   clients/src/main/java/org/apache/kafka/common/utils/Utils.java 0c6b3656375721a718fb4de10118170aacce0ea9

>   clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b0745b528cef929c4273f7e2ac4de1476cfc25ad

>   clients/src/test/java/org/apache/kafka/common/record/RecordTest.java ae54d67da9907b0a043180c7395a1370b3d0528d

>   clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java PRE-CREATION 
>   clients/src/test/java/org/apache/kafka/test/TestUtils.java 36cfc0fda742eb501af2c2c0330e3f461cf1f40c

>   core/src/main/scala/kafka/producer/ConsoleProducer.scala dd39ff22c918fe5b05f04582b748e32349b2055f

>   core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala PRE-CREATION

>   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala c002f5ea38ece66ad559fadb18ffaf40ac2026aa

>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 66ea76b9b6c0f8839f715c845fb9b9671b8f35c1

>   perf/src/main/scala/kafka/perf/ProducerPerformance.scala f12a45becb11a8bed586024866235b268630fec6

> 
> Diff: https://reviews.apache.org/r/18299/diff/
> 
> 
> Testing
> -------
> 
> integration tests
> 
> unit tests
> 
> stress tests (1K message size, 1M messages in producer performance with ack = 1, linger
time = 0ms/500ms, random bit/all-ones messages)
> 
> snappy dynamic load test
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message