kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Guozhang Wang" <wangg...@gmail.com>
Subject Re: Review Request 25995: Patch for KAFKA-1650
Date Wed, 03 Dec 2014 23:57:34 GMT


> On Dec. 3, 2014, 6:57 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 613
> > <https://reviews.apache.org/r/25995/diff/7/?file=774325#file774325line613>
> >
> >     I think there may be a race condition here, for example consider this sequence:
> >     
> >     1. data channel only contain one message.
> >     2. producer take the message from channel.
> >     3. dataChannel.clear() called.
> >     4. numMessageUnacked.get() == 0, offsets committed.
> >     5. producer.send() called, increment numMessageUnacked.
> >     6. data duplicate happens when the rebalance finished.
> >     
> >     I think on line 599 we should use "while" instead of "if", but this alone does
not fix this.
> 
> Jiangjie Qin wrote:
>     Yes, I actually have comment on this race condition in line 581. The reason I'm not
handling it here is:
>     1. The chance of this situation is very slight.
>     2. A single duplicate message does not really hurt.
>     3. The fix increase the complexity of the code (looking into the producer thread
status) and I'm not sure if it worth doing.
>     4. Even if we fix this, from the producer side, duplicates could still happen.

Shall we change line 691 to "while (numMessageUnacked.get() > 0)" at least?


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review63703
-----------------------------------------------------------


On Dec. 3, 2014, 11:02 p.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/25995/
> -----------------------------------------------------------
> 
> (Updated Dec. 3, 2014, 11:02 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1650 and KAKFA-1650
>     https://issues.apache.org/jira/browse/KAFKA-1650
>     https://issues.apache.org/jira/browse/KAKFA-1650
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments
> 
> 
> commit before switch to trunk
> 
> 
> commit before rebase
> 
> 
> Rebased on trunk, Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments on MaxInFlightRequests
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign
> 
> 
> Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
> 
> 
> Added consumer rebalance listener to mirror maker, will test it later.
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign
> 
> Conflicts:
> 	core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
> 	core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
> 
> added custom config for consumer rebalance listener
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign
> 
> 
> Add configurable consumer rebalance listener
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala da29a8cb461099eb675161db2f11a9937424a5c6

>   core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java PRE-CREATION

>   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 9d5a47fb8e04d0055cce820afde7f73affc0a984

>   core/src/main/scala/kafka/tools/MirrorMaker.scala f399105087588946987bbc84e3759935d9498b6a

>   core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 6a85d7e494f6c88798133a17f6180b61029dff58

>   core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 8c4687b2c96fddab7c8454a5a8011c3bab0897a0

> 
> Diff: https://reviews.apache.org/r/25995/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message