kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Guozhang Wang" <wangg...@gmail.com>
Subject Re: Review Request 35231: Address Onur and Jason's comments
Date Wed, 01 Jul 2015 22:48:06 GMT


> On June 18, 2015, 12:50 a.m., Jason Gustafson wrote:
> > core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, lines 229-236
> > <https://reviews.apache.org/r/35231/diff/1/?file=980918#file980918line229>
> >
> >     In manual group management, would we expect consumerId and generationId to be
null?
> 
> Guozhang Wang wrote:
>     In that case, the consumerId would be UNKNOWN_CONSUMER_ID = "", and the generationId
would be -1. These two values are only used inside OffsetManager.storeOffsets for logging.
> 
> Jason Gustafson wrote:
>     I wonder if it is worthwhile checking that those values are set accordingly? If the
generationId were 5, for example, would we want to just commit the offsets blindly? Or would
we throw an error?
> 
> Guozhang Wang wrote:
>     As Onur mentioned, when group == null it is also possible that the group has not
been created on the coordinator (when coordinator migrated, for example), and in this case
the consumerId / generationId would not be ""/-1.
> 
> Jason Gustafson wrote:
>     That makes sense. I was just thinking this might open the door to having commits
from old or invalid generations go through. Unless we store group metadata in zookeeper though,
perhaps there is no way to prevent it.
> 
> Onur Karaman wrote:
>     So I've been meaning to ask something similar.
>     
>     Guozhang: offline we talked about all offset logic validating generation id before
attempting to perform the action. To adjust for this proposed check, at one point we talked
about making ConsumerCoordinator more strictly follow the wiki and have the generation id
bump happen at the end of rebalance instead of at the beginning so that consumers would be
able to commit offsets prior to rebalancing. Given that this rb is about merging in the OffsetManager,
should those checks be added here or in a later rb?
> 
> Onur Karaman wrote:
>     My bad. I missed your generation id check in handleCommitOffsets. But I'm still curious
about the generation id bump placement with respect to committing offsets before providing
a JoinGroupRequest.

That is a good point. I think we should postpone the generation id bump from prepareRebalance()
to rebalance(), before the line of group.transitionTo(Rebalancing). Does that sound right
to you?


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35231/#review88301
-----------------------------------------------------------


On June 30, 2015, 1:44 a.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/35231/
> -----------------------------------------------------------
> 
> (Updated June 30, 2015, 1:44 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1740
>     https://issues.apache.org/jira/browse/KAFKA-1740
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> v2
> 
> 
> minor
> 
> 
> coordinator response test
> 
> 
> comments
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
6c26667182d7fa8153469a634881a7c34d8a0c91 
>   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 5b898c8f8ad5d0469f469b600c4b2eb13d1fc662

>   clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java 70844d65369f6ff300cbeb513dbb6650050c7eec

>   clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b5e8a0ff0aaf9df382b91f93e7ad51b1ed5f6209

>   clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java 512a0ef7e619d54e74122c38119209f5cf9590e3

>   clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
613b192ba84b66f79b45f3cd70418c3f503bee9e 
>   core/src/main/scala/kafka/admin/TopicCommand.scala dacbdd089f96c67b7bbc1c0cd36490d4fe094c1b

>   core/src/main/scala/kafka/cluster/Partition.scala 0990938b33ba7f3bccf373325dbbaee5e45ba8bb

>   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 6b4242c7cd1df9b3465db0fec35a25102c76cd60

>   core/src/main/scala/kafka/common/Topic.scala ad759786d1c22f67c47808c0b8f227eb2b1a9aa8

>   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala a385adbd7cb6ed693957df571d175ec36b8eaf94

>   core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala 0cd5605bcca78dd8a80e8586e58f8b2529da97d7

>   core/src/main/scala/kafka/server/KafkaApis.scala ad6f05807c61c971e5e60d24bc0c87e989115961

>   core/src/main/scala/kafka/server/KafkaServer.scala 52dc728bb1ab4b05e94dc528da1006040e2f28c9

>   core/src/main/scala/kafka/server/OffsetManager.scala 5cca85cf727975f6d3acb2223fd186753ad761dc

>   core/src/main/scala/kafka/server/ReplicaManager.scala 59c9bc3ac3a8afc07a6f8c88c5871304db588d17

>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 17b17b9b1520c7cc2e2ba96cdb1f9ff06e47bcad

>   core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 07b1ff47bfc3cd3f948c9533c8dc977fa36d996f

>   core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala c7136f20972614ac47aa57ab13e3c94ef775a4b7

>   core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala 4f124af5c3e946045a78ad1519c37372a72c8985

>   core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala a44fbd653b53649368db2656c3be3e14e3455163

>   core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala 08854c5e6ec249368206298b2ac2623df18f266a

>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 528525b719ec916e16f8b3ae3715bec4b5dcc47d

> 
> Diff: https://reviews.apache.org/r/35231/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message