kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Evan Pollan (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-5875) Consumer group repeatedly fails to join, even across JVM restarts: BufferUnderFlowException reading the {{version}} field in the consumer protocol header
Date Tue, 12 Sep 2017 15:34:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-5875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16163128#comment-16163128
] 

Evan Pollan commented on KAFKA-5875:
------------------------------------

It's worth mentioning that we are letting the consumer compute a default client ID.  IIRC,
it uses the hostname.  In our case, a Kubernetes pod has a hostname like {{name-GENERATION_HASH-POD_HASH}},
e.g. {{fetch-2420457278-dp3gf}}.  So, restarting the container that backs the pod means the
JVM comes up in a new container with the previous hostname.

> Consumer group repeatedly fails to join, even across JVM restarts: BufferUnderFlowException
reading the {{version}} field in the consumer protocol header
> ---------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5875
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5875
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Evan Pollan
>
> I've seen this maybe once a month in our large cluster Kubernetes-based Kafka consumers
& producers.  Every once in a while a consumer in a Kubernetes "pod" get this error trying
to join a consumer group:
> {code}
> {"level":"INFO","@timestamp":"2017-09-12T13:45:42.173+0000","logger":"org.apache.kafka.common.utils.AppInfoParser","message":"Kafka
version : 0.11.0.0","exception":""}
> {"level":"INFO","@timestamp":"2017-09-12T13:45:42.173+0000","logger":"org.apache.kafka.common.utils.AppInfoParser","message":"Kafka
commitId : cb8625948210849f","exception":""}
> {"level":"INFO","@timestamp":"2017-09-12T13:45:42.178+0000","logger":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","message":"Revoking
previously assigned partitions [] for group conv-fetch-jobs-runner-for-internal","exception":""}
> {"level":"INFO","@timestamp":"2017-09-12T13:45:42.178+0000","logger":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","message":"(Re-)joining
group conv-fetch-jobs-runner-for-internal","exception":""}
> {"level":"INFO","@timestamp":"2017-09-12T13:45:43.588+0000","logger":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","message":"Successfully
joined group conv-fetch-jobs-runner-for-internal with generation 17297","exception":""}
> {"errorType":"Error reading field 'version': java.nio.BufferUnderflowException","level":"ERROR","message":"Died!","operation":"Died!","stacktrace":"org.apache.kafka.common.protocol.types.SchemaException:
Error reading field 'version': java.nio.BufferUnderflowException\n\tat org.apache.kafka.common.protocol.types.Schema.read(Schema.java:75)\n\tat
org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:105)\n\tat
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:220)\n\tat
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)\n\tat
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)\n\tat
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)\n\tat
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)\n\tat
com.spredfast.common.kafka.consumer.RunnableConsumer.pollOnce(RunnableConsumer.java:141)\n\tat
com.spredfast.common.kafka.consumer.RunnableConsumer.access$000(RunnableConsumer.java:28)\n\tat
com.spredfast.common.kafka.consumer.RunnableConsumer$Loop.iterate(RunnableConsumer.java:125)\n\tat
com.spredfast.common.kafka.consumer.RunnableConsumer.run(RunnableConsumer.java:78)\n\tat java.lang.Thread.run(Thread.java:745)\n","trackingId":"dead-consumer","logger":"com.spredfast.common.kafka.consumer.RunnableConsumer","loggingVersion":"UNKNOWN","component":"MetricsFetch","pid":25,"host":"fetch-2420457278-sh4f5","@timestamp":"2017-09-12T13:45:43.613Z"}
> {code}
> Pardon the log format -- these get sucked into logstash, thus the JSON.
> Here's the raw stacktrace: 
> {code}
> org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'version':
java.nio.BufferUnderflowException
> 	at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:75)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:105)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:220)
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> 	at com.spredfast.common.kafka.consumer.RunnableConsumer.pollOnce(RunnableConsumer.java:141)
> 	at com.spredfast.common.kafka.consumer.RunnableConsumer.access$000(RunnableConsumer.java:28)
> 	at com.spredfast.common.kafka.consumer.RunnableConsumer$Loop.iterate(RunnableConsumer.java:125)
> 	at com.spredfast.common.kafka.consumer.RunnableConsumer.run(RunnableConsumer.java:78)
> 	at java.lang.Thread.run(Thread.java:745)
> {code}
> What's fascinating about this is:
> * We have a liveness probe (Kubernetes term for a healthcheck whose failure will cause
the container backing the "pod" to be killed and restarted) attached to the existence of dead
consumers.  When this situation happens, it _never_ resolves itself. Today, I found a pod
that had been restarted 1023 times due to this error.
> * The only way to make it go away is to _delete_ the Kubernetes pod.  This causes it
to be replaced by a pod on another Kubernetes host ("minion") using the same docker image
and configuration.  Invariably, this pod comes up and all consumers join just fine
> * We have _not_ tried restarting the brokers when this happens.  
> * There must be something about the pod, container, or Kubernetes host that is consistent
across pod crash loops that factors into the consumer group join process -- MAC?  hostname?
 Can't be anything that is recomputed on JVM restart, though...
> Seems like there's either:
> # a bug in the client (i.e. in its assumption that it can deserialize a protocol header
on successful return of that join future).  maybe there's a flavor of broker response that
doesn't include this header?
> # a bug in the broker in that it's sending an empty or undersized response to a group
join command in some situtations.
> It's worth noting that the severity of this issue is magnified by the fact that it requires
manual intervention.  It wouldn't be so bad if our healthcheck failure tripped a pod restart,
and the new JVM's consumers would join OK.  But, the fact that even a JVM restart doesn't
do it means most resiliency plays won't work.
> BTW, I see a similar schema read failure in https://issues.apache.org/jira/browse/KAFKA-4349,
although the client code is completely different (admin {{ConsumerGroupCommand}})



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message