Return-Path: X-Original-To: apmail-kafka-dev-archive@www.apache.org Delivered-To: apmail-kafka-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3785011611 for ; Wed, 16 Jul 2014 23:09:38 +0000 (UTC) Received: (qmail 2434 invoked by uid 500); 16 Jul 2014 23:09:33 -0000 Delivered-To: apmail-kafka-dev-archive@kafka.apache.org Received: (qmail 2393 invoked by uid 500); 16 Jul 2014 23:09:33 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 2377 invoked by uid 99); 16 Jul 2014 23:09:32 -0000 Received: from reviews-vm.apache.org (HELO reviews.apache.org) (140.211.11.40) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Jul 2014 23:09:32 +0000 Received: from reviews.apache.org (localhost [127.0.0.1]) by reviews.apache.org (Postfix) with ESMTP id 4B9C41DB889; Wed, 16 Jul 2014 23:09:20 +0000 (UTC) Content-Type: multipart/alternative; boundary="===============8910727055564992328==" MIME-Version: 1.0 Subject: Re: Review Request 23516: Patch for KAFKA-1462 From: "Guozhang Wang" To: "kafka" , "Jun Rao" , "Guozhang Wang" Date: Wed, 16 Jul 2014 23:09:20 -0000 Message-ID: <20140716230920.14170.15520@reviews.apache.org> X-ReviewBoard-URL: https://reviews.apache.org Auto-Submitted: auto-generated Sender: "Guozhang Wang" X-ReviewGroup: kafka X-ReviewRequest-URL: https://reviews.apache.org/r/23516/ X-Sender: "Guozhang Wang" References: <20140715183637.14170.64270@reviews.apache.org> In-Reply-To: <20140715183637.14170.64270@reviews.apache.org> Reply-To: "Guozhang Wang" X-ReviewRequest-Repository: kafka --===============8910727055564992328== Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23516/#review47940 ----------------------------------------------------------- clients/src/main/java/org/apache/kafka/common/Cluster.java Do we ever want to use * in imports? clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java What is this field used for? clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java I thought the broker will also check consumer id and generation id for offset fetch request? clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java This is a long lasting problem: replica_id here can not only be the id of a follower replica, but also one of the two values: "-1" for ordinary consumer, or "-2" for debugging consumer. We use this field into folds, 1) logging for trouble shooting, which can be helpful only when this is from a follower replica, 2) deciding if it is from the consumer or a replica, for this purpose we do not really care about this id value. We should probably rename this field. clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java Same as above. clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java For the customized rebalancer, will that be supported as: 1) user implement their rebalancer and re-deploy the server with this new code, 2) bounce the consumer setting the new rebalancer? Here I assume the "strategy" will just be the class name? - Guozhang Wang On July 15, 2014, 6:36 p.m., Jun Rao wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/23516/ > ----------------------------------------------------------- > > (Updated July 15, 2014, 6:36 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1462 > https://issues.apache.org/jira/browse/KAFKA-1462 > > > Repository: kafka > > > Description > ------- > > 1. I kept the request objects in the server separated from those in the client. This is because (1) some of the existing request objects are part of old client api (FetechRequest, OffsetCommitRequest, etc) and we can't remove them until the old clients are removed, (2) changing existing request objects on the server side requires significant refactoring. > > 2. On the client side, I refactored existing request objects a bit. Now, every request/response object extends from a GenericStruct. GenericStruct provides a standard way for doing serialization and toString so that we don't have to do that on every request. Each request/response can be constructed in two ways: (1) by providing request specific fields; (2) by providing a struct. The latter is used for getting a request/response from its serialized format. > > 3. On the server side. What I did is to keep the existing requests more or less untouched. For new types of requests, create a thin wrapper on the server side so that it can leverage the request objects created on the client side. This way the server side object will share the serialization and the toString logic with the client side object. In order to do this, I removed correlationId from the RequestOrResponse object. There is only one place where correlationId is directly referenced and it is not necessary. > > 4. Multi-version support. We now need to support two versions of OffsetCommitRequest since for the new consumer work, we added two extra fields in the request. For simplicity, both versions are converted to the same request object. Since the old version doesn't have the new fields, defaults will be used. > > 5. The new requests/responses are based on the format described in https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design#Kafka0.9ConsumerRewriteDesign-Requestformats. I made some minor changes to the wiki so that the new requests follow the current standard. > > 6. Added some missing util functions and added unit test for testing the serialization/deserialization logic. > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java a016269512b6d6d6e0fd3fab997e9c8265024eb4 > clients/src/main/java/org/apache/kafka/common/Cluster.java c62707ab3aba26771fc4b993df28bf8c44f32309 > clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java 6fe7573973832615976defa37fe0dfbb8f911939 > clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 044b03061802ee5e8ea4f1995fb0988e1a70e9a7 > clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 8cecba50bf067713184208552af36469962cd628 > clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java PRE-CREATION > clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java PRE-CREATION > clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java PRE-CREATION > clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java PRE-CREATION > clients/src/main/java/org/apache/kafka/common/requests/GenericStruct.java PRE-CREATION > clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java PRE-CREATION > clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java PRE-CREATION > clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java PRE-CREATION > clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java PRE-CREATION > clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java PRE-CREATION > clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java PRE-CREATION > clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java f35bd87cf0c52a30ed779b25d60bbe64a60b9502 > clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java 2652c32f123b3bc4b0456d4bc9fbba52c051724c > clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java PRE-CREATION > clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java PRE-CREATION > clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java PRE-CREATION > clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java PRE-CREATION > clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java 6036f6af1c55c1b0a15471e79b229b17f50ce31c > clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 6cf4fb714916f1a318d788cde8fc0aad9dfb83ca > clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java 66cc2fea6443968e525419a203dbc4227e0b1cdf > clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java 257b8287757e40349ea041ed7a651993007a55a8 > clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 2f98192b064d1ce7c0779e901293edb8c3801915 > clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java PRE-CREATION > core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala dfad6e6534dd9b00099d110804899080e8d832ab > core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala c72ca14708a3625cb89d5fb92630138d2afa2bf0 > core/src/main/scala/kafka/api/ControlledShutdownRequest.scala 7dacb2023788064b736df8b775aaf12281d545b5 > core/src/main/scala/kafka/api/ControlledShutdownResponse.scala 46ec3db28f88bbf9e0b0de2133807dc552bcae13 > core/src/main/scala/kafka/api/FetchRequest.scala a8b73acd1a813284744359e8434cb52d22063c99 > core/src/main/scala/kafka/api/GenericRequestOrResponseAndHeader.scala PRE-CREATION > core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala PRE-CREATION > core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala PRE-CREATION > core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala PRE-CREATION > core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala PRE-CREATION > core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala 3e408174dcc7e8dd9097bae41277ee4f7160afb3 > core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala f63644448bb5a1d560f79427284ccbac9d46b789 > core/src/main/scala/kafka/api/OffsetCommitRequest.scala 630768ab57afb579049bcbc5d44ee6823b0e7cc2 > core/src/main/scala/kafka/api/OffsetCommitResponse.scala 4946e9729ecbf3da35bdab5c832d26977c107e9e > core/src/main/scala/kafka/api/OffsetFetchRequest.scala a32f8588ff02f5fb3c99fb8e5508f462923e8edc > core/src/main/scala/kafka/api/OffsetFetchResponse.scala c1222f422ddb6413bbb2e5da2980903ee70b9156 > core/src/main/scala/kafka/api/OffsetRequest.scala 7cbc26c6e38420aa57046a76087fe6d15df72477 > core/src/main/scala/kafka/api/OffsetResponse.scala 0e1d6e362a1cec8250cf3930d3046058be4ae192 > core/src/main/scala/kafka/api/ProducerRequest.scala 0c295a2fe6712a77cd24719cb42015e2f787b08d > core/src/main/scala/kafka/api/ProducerResponse.scala 5a1d8015379b1f5d9130d9edca89544ee7dd0039 > core/src/main/scala/kafka/api/RequestKeys.scala fbfc9d3aeaffed4ca85902125fcc1050086835db > core/src/main/scala/kafka/api/RequestOrResponse.scala 57f87a48c5e87220e7f377b23d2bbfa0d16350dc > core/src/main/scala/kafka/api/StopReplicaRequest.scala 68fc1389ee71122adb716d9d821dd8987a78ecee > core/src/main/scala/kafka/api/StopReplicaResponse.scala c90ddee3d820472236ab554cddd2e0db24233ae3 > core/src/main/scala/kafka/api/TopicMetadataRequest.scala a319f2f438bfd84c63edb330685b2b41d4b08aa0 > core/src/main/scala/kafka/api/TopicMetadataResponse.scala f6b7429faeab34d0938cb2f78ce91021be7c4b85 > core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 543e262b25a946abd84dd58dc5fcee67c6252375 > core/src/main/scala/kafka/api/UpdateMetadataResponse.scala c583c1f00c89a993fb9dc280f190c32ea895dca5 > core/src/main/scala/kafka/controller/ControllerChannelManager.scala 8763968fbff697e4c5c98ab1274627c192a4d26a > core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala 08dcc5553ccac7fbec0ea2662b402e2cec079e48 > core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala 7e6da164a26b1893c26c624a9998d4fedf8af95e > core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala a2117b34c2ee3554602fe068eed0c90b075958c1 > > Diff: https://reviews.apache.org/r/23516/diff/ > > > Testing > ------- > > > Thanks, > > Jun Rao > > --===============8910727055564992328==--