kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ian Friedman <...@flurry.com>
Subject Re: Offset committing on rebalance
Date Wed, 21 Aug 2013 03:52:57 GMT
Hey just reporting that the ZK disconnect tip on the FAQ was in fact right on the money. After
tweaking our GC settings and zk timeout settings, I'm no longer seeing the flood of rebalances.
 

--  
Ian Friedman


On Tuesday, August 20, 2013 at 2:26 AM, Ian Friedman wrote:

> Sorry, ignore that first exception, I believe that was caused by an actual manual shutdown.
The NoNode exception though, has been popping up a lot, and I am not sure if it's relevant,
but it seems to show up a bunch when the consumers decide it's time to rebalance continuously.
 
>  
> --  
> Ian Friedman
>  
>  
> On Tuesday, August 20, 2013 at 2:17 AM, Ian Friedman wrote:
>  
> > That's not it either. I just had all my consumers shut down on me with this:  
> >  
> > INFO  21:51:13,948 () ZkUtils$ - conflict in /consumers/flurry1/owners/dataLogPaths/1-183
data: flurry1_hs1030-1376964634130-dcc9192a-0 stored data: flurry1_hs1061-1376964609207-4b7f348b-0
> > INFO  21:51:13,948 () ZookeeperConsumerConnector - flurry1_hs1030-1376964634130-dcc9192a
waiting for the partition ownership to be deleted: 1-183
> > INFO  21:51:13,950 () ZookeeperConsumerConnector - flurry1_hs1030-1376964634130-dcc9192a
flurry1_hs1030-1376964634130-dcc9192a-0 successfully owned partition 1-180 for topic dataLogPaths
> >  
> >  
> >  
> > and I've also been seeing:
> >  
> > INFO  21:51:15,971 () ZookeeperConsumerConnector - flurry1_hs1030-1376964634130-dcc9192a
begin rebalancing consumer flurry1_hs1030-1376964634130-dcc9192a try #3
> > INFO  21:51:16,038 () ZookeeperConsumerConnector - flurry1_hs1030-1376964634130-dcc9192a
exception during rebalance  
> > org.I0Itec.zkclient.exception.ZkNoNodeException: org.apache.zookeeper.KeeperException$NoNodeException:
KeeperErrorCode = NoNode for /consumers/flurry1/ids/flurry1_hs676-1376964612747-6f532caa
> >         at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
> >         at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
> >         at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
> >         at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
> >         at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:750)
> >         at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:744)
> >         at kafka.utils.ZkUtils$.readData(ZkUtils.scala:162)
> >         at kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:66)
> >         at kafka.utils.ZkUtils$$anonfun$getConsumersPerTopic$1.apply(ZkUtils.scala:259)
> >         at kafka.utils.ZkUtils$$anonfun$getConsumersPerTopic$1.apply(ZkUtils.scala:258)
> >         at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> >         at scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:474)
> >         at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> >         at scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:521)
> >         at kafka.utils.ZkUtils$.getConsumersPerTopic(ZkUtils.scala:258)
> >         at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:478)
> >         at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:449)
> >         at scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:285)
> >         at scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)
> >         at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:444)
> >         at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:401)
> > Caused by: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode
= NoNode for /consumers/flurry1/ids/flurry1_hs676-1376964612747-6f532caa
> >         at org.apache.zookeeper.KeeperException.create(KeeperException.java:111)
> >         at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
> >         at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1151)
> >         at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1180)
> >         at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103)
> >         at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
> >         at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)
> >         at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
> >         ... 19 more
> > INFO  21:51:16,039 () ZookeeperConsumerConnector - flurry1_hs1030-1376964634130-dcc9192a
end rebalancing consumer flurry1_hs1030-1376964634130-dcc9192a try #3
> >  
> >  
> > any ideas?  
> >  
> > --  
> > Ian Friedman
> >  
> >  
> > On Monday, August 19, 2013 at 11:58 PM, Jun Rao wrote:
> >  
> > > Any failure/restart of a consumer or a broker can also trigger a rebalance.
> > >  
> > > Thanks,
> > >  
> > > Jun
> > >  
> > >  
> > > On Mon, Aug 19, 2013 at 6:00 PM, Ian Friedman <ian@flurry.com (mailto:ian@flurry.com)>
wrote:
> > >  
> > > > Jun, I read that FAQ entry you linked, but I am not seeing any Zookeeper
> > > > connection loss in the logs. It's rebalancing multiple times per minute,
> > > > though. Any idea what else could cause this? We're running kafka 0.7.2
on
> > > > approx 400 consumers against a topic with 400 partitions * 3 brokers.
> > > >  
> > > > --
> > > > Ian Friedman
> > > >  
> > > >  
> > > > On Thursday, August 15, 2013 at 11:52 AM, Jun Rao wrote:
> > > >  
> > > > > Yes, during rebalances, messages could be re-delievered since the
new
> > > > owner
> > > > > of a partition starts fetching from the last checkpointed offset
in ZK.
> > > > >  
> > > > > For reasons on why rebalances happen a lot, see
> > > > https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whyaretheremanyrebalancesinmyconsumerlog%3F
> > > > >  
> > > > > Thanks,
> > > > >  
> > > > > Jun
> > > > >  
> > > > >  
> > > > > On Thu, Aug 15, 2013 at 8:36 AM, Ian Friedman <ian@flurry.com
(mailto:ian@flurry.com) (mailto:
> > > > ian@flurry.com (mailto:ian@flurry.com))> wrote:
> > > > >  
> > > > > > It's a simple enough patch, but wouldn't this mean that messages
still
> > > > in
> > > > > > process when a rebalance happens could get delivered to another
> > > > >  
> > > >  
> > > > consumer if
> > > > > > we end up losing the partition? Rebalances seem to happen very
> > > > >  
> > > >  
> > > > frequently
> > > > > > with a lot of consumers for some reason… And it doesn't seem
like a
> > > > > > consumer is guaranteed or likely to retain ownership of a partition
> > > > > >  
> > > > >  
> > > >  
> > > > it's in
> > > > > > the middle of consuming after a rebalance.
> > > > > >  
> > > > > > --
> > > > > > Ian Friedman
> > > > > >  
> > > > > >  
> > > > > > On Thursday, August 15, 2013 at 10:53 AM, Jun Rao wrote:
> > > > > >  
> > > > > > > We are only patching blocker issues in 0.7. 0.8 beta1 has
been
> > > > released
> > > > > > and
> > > > > > > most dev effort will be on 0.8 and beyond. That said. This
particular
> > > > > >  
> > > > > >  
> > > > > > case
> > > > > > > is easy to fix. If you can port the patch in
> > > > > > > https://issues.apache.org/jira/browse/KAFKA-919 o the 0.7
branch ,
> > > > > > >  
> > > > > >  
> > > > > >  
> > > > >  
> > > >  
> > > > we
> > > > > > >  
> > > > > >  
> > > > > >  
> > > > > > can
> > > > > > > commit that to the 0.7 branch.
> > > > > > >  
> > > > > > > Thanks,
> > > > > > >  
> > > > > > > Jun
> > > > > > >  
> > > > > > >  
> > > > > > > On Wed, Aug 14, 2013 at 9:30 PM, Ian Friedman <ian@flurry.com
(mailto:ian@flurry.com)(mailto:
> > > > ian@flurry.com (mailto:ian@flurry.com)) (mailto:
> > > > > > ian@flurry.com (mailto:ian@flurry.com) (mailto:ian@flurry.com))>
wrote:
> > > > > > >  
> > > > > > > > Ugh.
> > > > > > > >  
> > > > > > > > Is there any way to make this work in 0.7, or is transitioning
to
> > > > 0.8
> > > > > > the
> > > > > > > > only way? My operations engineers spent a lot of effort
in
> > > > > > >  
> > > > > >  
> > > > > >  
> > > > >  
> > > >  
> > > > configuring
> > > > > > >  
> > > > > >  
> > > > > >  
> > > > > > and
> > > > > > > > hardening our 0.7 production install, and 0.8 isn't
released yet.
> > > > > > >  
> > > > > >  
> > > > > >  
> > > > >  
> > > >  
> > > > Not
> > > > > > >  
> > > > > >  
> > > > > >  
> > > > > > to
> > > > > > > > mention having to integrate the new client side code.
> > > > > > > >  
> > > > > > > > Either way, thanks for all your help Jun.
> > > > > > > >  
> > > > > > > > --
> > > > > > > > Ian Friedman
> > > > > > > >  
> > > > > > > >  
> > > > > > > > On Thursday, August 15, 2013 at 12:21 AM, Jun Rao
wrote:
> > > > > > > >  
> > > > > > > > > Yes, this is an issue and has been fixed in 0.8.
> > > > > > > > >  
> > > > > > > > > Thanks,
> > > > > > > > >  
> > > > > > > > > Jun
> > > > > > > > >  
> > > > > > > > >  
> > > > > > > > > On Wed, Aug 14, 2013 at 5:21 PM, Ian Friedman
<ian@flurry.com (mailto:ian@flurry.com)(mailto:
> > > > ian@flurry.com (mailto:ian@flurry.com))(mailto:
> > > > > > ian@flurry.com (mailto:ian@flurry.com) (mailto:ian@flurry.com))
(mailto:
> > > > > > > > ian@flurry.com (mailto:ian@flurry.com) (mailto:ian@flurry.com))>
wrote:
> > > > > > > > >  
> > > > > > > > > > Hey guys,
> > > > > > > > > >  
> > > > > > > > > > I designed my consumer app (running on 0.7)
to run with
> > > > autocommit
> > > > > > off
> > > > > > > > and
> > > > > > > > > > commit manually once it was done processing
a record. The
> > > > > > > > >  
> > > > > > > >  
> > > > > > > >  
> > > > > > >  
> > > > > >  
> > > > > >  
> > > > >  
> > > >  
> > > > intent
> > > > > > > > >  
> > > > > > > >  
> > > > > > >  
> > > > > >  
> > > > > >  
> > > > > > was so
> > > > > > > > > > that if a consumer died while processing
a message, the offset
> > > > > > > > >  
> > > > > > > >  
> > > > > > >  
> > > > > >  
> > > > > >  
> > > > > > would
> > > > > > > > > >  
> > > > > > > > >  
> > > > > > > >  
> > > > > > > >  
> > > > > > > >  
> > > > > > > > not be
> > > > > > > > > > committed, and another box would pick up
the partition and
> > > > > > > > >  
> > > > > > > >  
> > > > > > > >  
> > > > > > >  
> > > > > >  
> > > > > >  
> > > > > > reprocess
> > > > > > > > >  
> > > > > > > >  
> > > > > > > >  
> > > > > > > >  
> > > > > > > > the
> > > > > > > > > > message. This seemed to work fine with small
numbers of
> > > > > > > > >  
> > > > > > > >  
> > > > > > > >  
> > > > > > >  
> > > > > >  
> > > > > >  
> > > > >  
> > > >  
> > > > consumers
> > > > > > > > >  
> > > > > > > >  
> > > > > > > >  
> > > > > > > >  
> > > > > > > > (~10).
> > > > > > > > > > But now that I'm scaling it out, I'm running
into a problem
> > > > > > > > >  
> > > > > > > >  
> > > > > > > >  
> > > > > > >  
> > > > > >  
> > > > >  
> > > >  
> > > > where
> > > > > > > > >  
> > > > > > > >  
> > > > > > >  
> > > > > >  
> > > > > >  
> > > > > > it
> > > > > > > > >  
> > > > > > > >  
> > > > > > > >  
> > > > > > > >  
> > > > > > > > looks
> > > > > > > > > > like messages that consumers picked up and
then errored on are
> > > > > > > > >  
> > > > > > > >  
> > > > > > > >  
> > > > > > >  
> > > > > >  
> > > > > >  
> > > > >  
> > > >  
> > > > not
> > > > > > > > >  
> > > > > > > >  
> > > > > > > >  
> > > > > > > >  
> > > > > > > > getting
> > > > > > > > > > processed on another machine.
> > > > > > > > > >  
> > > > > > > > > > After investigating the logs and the partition
offsets in
> > > > > > zookeeper, I
> > > > > > > > > > found that in ZookeeperConsumerConnector.scala
> > > > > > > > >  
> > > > > > > >  
> > > > > > >  
> > > > > >  
> > > > > >  
> > > > > > closeFetchersForQueues,
> > > > > > > > > > called during the rebalance process, will
commit the offset
> > > > > > > > >  
> > > > > > > >  
> > > > > > >  
> > > > > >  
> > > > > >  
> > > > > > regardless
> > > > > > > > > >  
> > > > > > > > >  
> > > > > > > >  
> > > > > > > >  
> > > > > > > >  
> > > > > > > > of
> > > > > > > > > > the autocommit status. So it looks like
even if my consumer is
> > > > > > > > >  
> > > > > > > >  
> > > > > > > >  
> > > > > > >  
> > > > > >  
> > > > > >  
> > > > >  
> > > >  
> > > > in
> > > > > > > > >  
> > > > > > > >  
> > > > > > >  
> > > > > >  
> > > > > >  
> > > > > > the
> > > > > > > > > > middle of processing a message, the offset
will be committed,
> > > > > > > > >  
> > > > > > > >  
> > > > > > >  
> > > > > >  
> > > > > >  
> > > > >  
> > > >  
> > > > and
> > > > > > > > >  
> > > > > > > >  
> > > > > > >  
> > > > > >  
> > > > > >  
> > > > > > even
> > > > > > > > > >  
> > > > > > > > >  
> > > > > > > >  
> > > > > > > >  
> > > > > > > >  
> > > > > > > > if
> > > > > > > > > > the processing fails, it will never be picked
up again. Now
> > > > > > > > >  
> > > > > > > >  
> > > > > > > >  
> > > > > > >  
> > > > > >  
> > > > > >  
> > > > >  
> > > >  
> > > > that I
> > > > > > > > >  
> > > > > > > >  
> > > > > > > >  
> > > > > > > >  
> > > > > > > > have a
> > > > > > > > > > lot of consumer nodes, the rebalancer is
going off a lot more
> > > > > > > > >  
> > > > > > > >  
> > > > > > > >  
> > > > > > >  
> > > > > >  
> > > > >  
> > > >  
> > > > often
> > > > > > > > >  
> > > > > > > >  
> > > > > > > >  
> > > > > > > >  
> > > > > > > > and I'm
> > > > > > > > > > running into this constantly.
> > > > > > > > > >  
> > > > > > > > > > Were my assumptions faulty? Did I design
this wrong? After
> > > > reading
> > > > > > the
> > > > > > > > > > comment in the code I understand that if
it didn't commit the
> > > > > > > > >  
> > > > > > > >  
> > > > > > >  
> > > > > >  
> > > > > >  
> > > > > > offset
> > > > > > > > > >  
> > > > > > > > >  
> > > > > > > >  
> > > > > > > >  
> > > > > > > >  
> > > > > > > > there,
> > > > > > > > > > the message would just get immediately consumed
by whoever
> > > > > > > > >  
> > > > > > > >  
> > > > > > > >  
> > > > > > >  
> > > > > >  
> > > > > >  
> > > > >  
> > > >  
> > > > ended up
> > > > > > > > >  
> > > > > > > >  
> > > > > > > >  
> > > > > > > >  
> > > > > > > > owning
> > > > > > > > > > the partition, even if we were in the middle
of consuming it
> > > > > > > > >  
> > > > > > > >  
> > > > > > > >  
> > > > > > > >  
> > > > > > > > elsewhere, and
> > > > > > > > > > we'd get unintentional duplicate delivery.
How can I make it
> > > > > > > > >  
> > > > > > > >  
> > > > > > > >  
> > > > > > >  
> > > > > >  
> > > > >  
> > > >  
> > > > work
> > > > > > > > >  
> > > > > > > >  
> > > > > > >  
> > > > > >  
> > > > > >  
> > > > > > the
> > > > > > > > >  
> > > > > > > >  
> > > > > > > >  
> > > > > > > >  
> > > > > > > > way
> > > > > > > > > > I've described? Is there any way?
> > > > > > > > > >  
> > > > > > > > > > Thanks in advance,
> > > > > > > > > >  
> > > > > > > > > > --
> > > > > > > > > > Ian Friedman
> > > > > > > > > >  
> > > > > > > > >  
> > > > > > > >  
> > > > > > > >  
> > > > > > >  
> > > > > >  
> > > > > >  
> > > > >  
> > > >  
> > > >  
> > >  
> > >  
> > >  
> > >  
> >  
> >  
>  


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message