Return-Path: X-Original-To: apmail-kafka-dev-archive@www.apache.org Delivered-To: apmail-kafka-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DB30B10D1F for ; Tue, 18 Mar 2014 19:07:13 +0000 (UTC) Received: (qmail 40982 invoked by uid 500); 18 Mar 2014 19:07:13 -0000 Delivered-To: apmail-kafka-dev-archive@kafka.apache.org Received: (qmail 40621 invoked by uid 500); 18 Mar 2014 19:07:10 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 40595 invoked by uid 99); 18 Mar 2014 19:07:07 -0000 Received: from reviews-vm.apache.org (HELO reviews.apache.org) (140.211.11.40) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 Mar 2014 19:07:07 +0000 Received: from reviews.apache.org (localhost [127.0.0.1]) by reviews.apache.org (Postfix) with ESMTP id ECDD51D54D2; Tue, 18 Mar 2014 19:07:04 +0000 (UTC) Content-Type: multipart/alternative; boundary="===============5214917724689997901==" MIME-Version: 1.0 Subject: Re: Review Request 18299: Fix KAFKA-1253 From: "Jun Rao" To: "kafka" , "Jun Rao" , "Guozhang Wang" Date: Tue, 18 Mar 2014 19:07:04 -0000 Message-ID: <20140318190704.27430.60115@reviews.apache.org> X-ReviewBoard-URL: https://reviews.apache.org Auto-Submitted: auto-generated Sender: "Jun Rao" X-ReviewGroup: kafka X-ReviewRequest-URL: https://reviews.apache.org/r/18299/ X-Sender: "Jun Rao" References: <20140317225655.17334.59868@reviews.apache.org> In-Reply-To: <20140317225655.17334.59868@reviews.apache.org> Reply-To: "Jun Rao" X-ReviewRequest-Repository: kafka --===============5214917724689997901== Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/18299/#review37585 ----------------------------------------------------------- 1. There are two unit test failures. RecordAccumulatorTest. testFull RecordAccumulatorTest. testLinger 2. The issue with compressed message size exceeding the request size is still not resolved. clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java Could we write that as a private util function expandBuffer()? clients/src/main/java/org/apache/kafka/common/record/Compressor.java Should we add a debug level logging for those unexpected exceptions? clients/src/main/java/org/apache/kafka/common/record/Compressor.java If we want to be a bit conservative, shouldn't we take the max of the two estimates? clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java Why do we need to mark this position? clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java Ditto as the above? clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java Shouldn't >= be <= ? clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java This comment is still not clear to me. core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala Not sure how this test works. We send messages randomly to one of the four partitions, yet we only read data from partition 0. Do we need to have 4 partitions in the topic? Do we need more than 1 broker? - Jun Rao On March 17, 2014, 10:56 p.m., Guozhang Wang wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/18299/ > ----------------------------------------------------------- > > (Updated March 17, 2014, 10:56 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1253 > https://issues.apache.org/jira/browse/KAFKA-1253 > > > Repository: kafka > > > Description > ------- > > In-place compression with > > 1) Dynamic reallocation in the underlying byte buffer > 2) Written bytes estimate to reduce reallocation probabilities > 3) Deallocation in buffer pool following the original capacity > > > Diffs > ----- > > build.gradle 84fa0d6b5f7405af755c5d7ff7bdd7592bb8668f > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1ac69436f117800815b8d50f042e9e2a29364b43 > clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 32e12ad149f6d70c96a498d0a390976f77bf9e2a > clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java 5bed60730ea108684bea2440af5a008feb0ada61 > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 7a03f389cc6a4d56152b882555d7498af9a04d20 > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java 038a05a94b795ec0a95b2d40a89222394b5a74c4 > clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java 3ebbb804242be6a001b3bae6524afccc85a87602 > clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java PRE-CREATION > clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java PRE-CREATION > clients/src/main/java/org/apache/kafka/common/record/CompressionFactory.java PRE-CREATION > clients/src/main/java/org/apache/kafka/common/record/CompressionType.java 906da02d02c03aadd8ab73ed2fc9a1898acb8d72 > clients/src/main/java/org/apache/kafka/common/record/Compressor.java PRE-CREATION > clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 9d8935fa3beeb2a78b109a41ed76fd4374239560 > clients/src/main/java/org/apache/kafka/common/record/Record.java f1dc9778502cbdfe982254fb6e25947842622239 > clients/src/main/java/org/apache/kafka/common/utils/Utils.java 0c6b3656375721a718fb4de10118170aacce0ea9 > clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b0745b528cef929c4273f7e2ac4de1476cfc25ad > clients/src/test/java/org/apache/kafka/common/record/RecordTest.java ae54d67da9907b0a043180c7395a1370b3d0528d > clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java PRE-CREATION > clients/src/test/java/org/apache/kafka/test/TestUtils.java 36cfc0fda742eb501af2c2c0330e3f461cf1f40c > core/src/main/scala/kafka/producer/ConsoleProducer.scala dd39ff22c918fe5b05f04582b748e32349b2055f > core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala PRE-CREATION > core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala c002f5ea38ece66ad559fadb18ffaf40ac2026aa > core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 66ea76b9b6c0f8839f715c845fb9b9671b8f35c1 > perf/src/main/scala/kafka/perf/ProducerPerformance.scala f12a45becb11a8bed586024866235b268630fec6 > > Diff: https://reviews.apache.org/r/18299/diff/ > > > Testing > ------- > > integration tests > > unit tests > > stress tests (1K message size, 1M messages in producer performance with ack = 1, linger time = 0ms/500ms, random bit/all-ones messages) > > snappy dynamic load test > > > Thanks, > > Guozhang Wang > > --===============5214917724689997901==--